diff --git a/apprise/asset.py b/apprise/asset.py index aef90f08..f9399a77 100644 --- a/apprise/asset.py +++ b/apprise/asset.py @@ -149,6 +149,12 @@ class AppriseAsset: # if Persistent Storage was set to `memory` pgp_autogen = True + # Automatically generate our Privacy Enhanced Mail (PEM) keys if one isn't + # present and our environment configuration allows for it. + # For example, a case where the environment wouldn't allow for it would be + # if Persistent Storage was set to `memory` + pem_autogen = True + # For more detail see CWE-312 @ # https://cwe.mitre.org/data/definitions/312.html # diff --git a/apprise/exception.py b/apprise/exception.py index 216e5cc7..8d7f86eb 100644 --- a/apprise/exception.py +++ b/apprise/exception.py @@ -53,6 +53,14 @@ class AppriseDiskIOError(AppriseException): super().__init__(message, error_code=error_code) +class AppriseInvalidData(AppriseException): + """ + Thrown when bad data was passed into an internal function + """ + def __init__(self, message, error_code=errno.EINVAL): + super().__init__(message, error_code=error_code) + + class AppriseFileNotFound(AppriseDiskIOError, FileNotFoundError): """ Thrown when a persistent write occured in MEMORY mode diff --git a/apprise/plugins/vapid/__init__.py b/apprise/plugins/vapid/__init__.py new file mode 100644 index 00000000..d22920af --- /dev/null +++ b/apprise/plugins/vapid/__init__.py @@ -0,0 +1,585 @@ +# -*- coding: utf-8 -*- +# BSD 2-Clause License +# +# Apprise - Push Notification Library. +# Copyright (c) 2025, Chris Caron +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import requests +from itertools import chain +from json import dumps +from ..base import NotifyBase +from ...common import NotifyType +from ...common import NotifyImageSize +from ...common import PersistentStoreMode +from ...utils.parse import parse_list, parse_bool, is_email +from ...utils.base64 import base64_urlencode +from ...utils import pem as _pem +from ...locale import gettext_lazy as _ +import time + +# Default our global support flag +NOTIFY_VAPID_SUPPORT_ENABLED = False + +try: + from . import subscription + + NOTIFY_VAPID_SUPPORT_ENABLED = True + +except ImportError: + # cryptography is the dependency of the .subscription library + pass + + +class VapidPushMode: + """ + Supported Vapid Push Services + """ + CHROME = 'chrome' + FIREFOX = 'firefox' + EDGE = 'edge' + OPERA = 'opera' + + +VAPID_API_LOOKUP = { + VapidPushMode.CHROME: + 'https://fcm.googleapis.com/fcm/send', + VapidPushMode.FIREFOX: + 'https://updates.push.services.mozilla.com/wpush/v1', + VapidPushMode.EDGE: + 'https://fcm.googleapis.com/fcm/send', # Edge uses FCM too + VapidPushMode.OPERA: + 'https://fcm.googleapis.com/fcm/send', # Opera is Chromium-based +} + +VAPID_PUSH_MODES = ( + VapidPushMode.CHROME, + VapidPushMode.FIREFOX, + VapidPushMode.EDGE, + VapidPushMode.OPERA, +) + + +class NotifyVapid(NotifyBase): + """ + A wrapper for WebPush/Vapid notifications + """ + # Set our global enabled flag + enabled = NOTIFY_VAPID_SUPPORT_ENABLED and _pem.PEM_SUPPORT + + requirements = { + # Define our required packaging in order to work + 'packages_required': 'cryptography' + } + + # The default descriptive name associated with the Notification + service_name = 'Vapid Web Push Notifications' + + # The services URL + service_url = \ + 'https://datatracker.ietf.org/doc/html/draft-thomson-webpush-vapid' + + # The default protocol + secure_protocol = 'vapid' + + # A URL that takes you to the setup/help of the specific protocol + setup_url = 'https://github.com/caronc/apprise/wiki/Notify_vapid' + + # There is no reason we should exceed 5KB when reading in a PEM file. + # If it is more than this, then it is not accepted. + max_vapid_keyfile_size = 5000 + + # There is no reason we should exceed 5MB when reading in a JSON file. + # If it is more than this, then it is not accepted. + max_vapid_subfile_size = 5242880 + + # The maximum length of the body + body_maxlen = 1024 + + # Our default is to no not use persistent storage beyond in-memory + # reference; this allows us to auto-generate our config if needed + storage_mode = PersistentStoreMode.AUTO + + # 43200 = 12 hours + vapid_jwt_expiration_sec = 43200 + + # Subscription file + vapid_subscription_file = 'subscriptions.json' + + # Allows the user to specify the NotifyImageSize object + image_size = NotifyImageSize.XY_72 + + # Define object templates + templates = ( + '{schema}://{subscriber}', + '{schema}://{subscriber}/{targets}', + ) + + # Define our template tokens + template_tokens = dict(NotifyBase.template_tokens, **{ + 'subscriber': { + 'name': _('API Key'), + 'type': 'string', + 'private': True, + 'required': True, + }, + 'targets': { + 'name': _('Targets'), + 'type': 'list:string', + }, + }) + + # Define our template args + template_args = dict(NotifyBase.template_tokens, **{ + 'mode': { + 'name': _('Mode'), + 'type': 'choice:string', + 'values': VAPID_PUSH_MODES, + 'default': VAPID_PUSH_MODES[0], + 'map_to': 'mode', + }, + # Default Time To Live (defined in seconds) + # 0 (Zero) - message will be delivered only if the device is reacheable + 'ttl': { + 'name': _('ttl'), + 'type': 'int', + 'default': 0, + 'min': 0, + 'max': 60, + }, + 'to': { + 'alias_of': 'targets', + }, + 'from': { + 'alias_of': 'subscriber', + }, + 'keyfile': { + # A Private Keyfile is required to sign header + 'name': _('PEM Private KeyFile'), + 'type': 'string', + 'private': True, + }, + 'subfile': { + # A Subscripion File is required to sign header + 'name': _('Subscripion File'), + 'type': 'string', + 'private': True, + }, + 'image': { + 'name': _('Include Image'), + 'type': 'bool', + 'default': True, + 'map_to': 'include_image', + }, + }) + + def __init__(self, subscriber, mode=None, targets=None, keyfile=None, + subfile=None, include_image=None, ttl=None, **kwargs): + """ + Initialize Vapid Messaging + + """ + super().__init__(**kwargs) + + # Path to our Private Key file + self.keyfile = None + + # Path to our subscription.json file + self.subfile = None + + # + # Our Targets + # + self.targets = [] + self._invalid_targets = [] + + # default subscriptions + self.subscriptions = {} + self.subscriptions_loaded = False + + # Set our Time to Live Flag + if ttl is None: + self.ttl = self.template_args['ttl']['default'] + + else: + try: + self.ttl = int(ttl) + + except (ValueError, TypeError): + # Do nothing + pass + + if self.ttl < self.template_args['ttl']['min'] or \ + self.ttl > self.template_args['ttl']['max']: + msg = 'The Vapid TTL specified ({}) is out of range.'\ + .format(self.ttl) + self.logger.warning(msg) + raise TypeError(msg) + + # Place a thumbnail image inline with the message body + self.include_image = \ + self.template_args['image']['default'] \ + if include_image is None else include_image + + result = is_email(subscriber) + if not result: + msg = 'An invalid Vapid Subscriber' \ + '({}) was specified.'.format(subscriber) + self.logger.warning(msg) + raise TypeError(msg) + self.subscriber = result['full_email'] + + # Store our Mode/service + try: + self.mode = \ + NotifyVapid.template_args['mode']['default'] \ + if mode is None else mode.lower() + + if self.mode not in VAPID_PUSH_MODES: + # allow the outer except to handle this common response + raise + except: + # Invalid region specified + msg = 'The Vapid mode specified ({}) is invalid.' \ + .format(mode) + self.logger.warning(msg) + raise TypeError(msg) + + # Our Private keyfile + self.keyfile = keyfile + + # Our Subscription file + self.subfile = subfile + + # Prepare our PEM Object + self.pem = _pem.ApprisePEMController(self.store.path, asset=self.asset) + + # Create our subscription object + self.subscriptions = subscription.WebPushSubscriptionManager( + asset=self.asset) + + if self.subfile is None and \ + self.store.mode != PersistentStoreMode.MEMORY and \ + self.asset.pem_autogen: + + self.subfile = os.path.join( + self.store.path, self.vapid_subscription_file) + if not os.path.exists(self.subfile) and \ + self.subscriptions.write(self.subfile): + self.logger.info( + 'Vapid auto-generated %s/%s', + os.path.basename( + self.store.path, + self.vapid_subscription_file)) + + # Acquire our targets for parsing + self.targets = parse_list(targets) + if not self.targets: + # Add ourselves + self.targets.append(self.subscriber) + + return + + def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs): + """ + Perform Vapid Notification + """ + + if not self.pem and not self.pem.load_private_key(self.keyfile): + self.logger.warning( + 'Provided Vapid/WebPush (PEM) Private Key file could ' + 'not be loaded.') + return False + + if not self.targets: + # There is no one to notify; we're done + self.logger.warning('There are no Vapid targets to notify') + return False + + if not self.subscriptions_loaded and self.subfile: + # Toggle our loaded flag to prevent trying again later + self.subscriptions_loaded = True + if not self.subscriptions.load( + self.subfile, byte_limit=self.max_vapid_subfile_size): + self.logger.warning( + 'Provided Vapid/WebPush subscriptions file could not be ' + 'loaded.') + return False + + if not self.subscriptions: + self.logger.warning('Vapid could not load subscriptions') + return False + + if not self.pem: + self.logger.warning( + 'No Vapid/WebPush (PEM) Private Key file could be loaded.') + return False + + # Prepare our notify URL (based on our mode) + notify_url = VAPID_API_LOOKUP[self.mode] + + jwt_token = self.jwt_token + if not jwt_token: + self.logger.warning( + 'A Vapid JWT Token could not be generated') + return False + + headers = { + 'User-Agent': self.app_id, + "TTL": str(self.ttl), + "Content-Encoding": "aes128gcm", + "Content-Type": "application/octet-stream", + "Authorization": f"vapid t={jwt_token}, k={self.public_key}", + } + + has_error = False + + # Create a copy of the targets list + targets = list(self.targets) + while len(targets): + target = targets.pop(0) + if target not in self.subscriptions: + self.logger.warning( + 'Dropped Vapid user ' + '(%s) specified - not found in subscriptions.json.' % + target, + ) + # Save ourselves from doing this again + self._invalid_targets.append(target) + self.targets.remove(target) + has_error = True + continue + + # Encrypt our payload + encrypted_payload = self.pem.encrypt_webpush( + body, + public_key=self.subscriptions[target].public_key, + auth_secret=self.subscriptions[target].auth_secret) + + self.logger.debug( + 'Vapid %s POST URL: %s (cert_verify=%r)', + self.mode, notify_url, self.verify_certificate, + ) + self.logger.debug( + 'Vapid %s Encrypted Payload: %d byte(s)', self.mode, len(body)) + + # Always call throttle before any remote server i/o is made + self.throttle() + try: + r = requests.post( + notify_url, + data=encrypted_payload, + headers=headers, + verify=self.verify_certificate, + timeout=self.request_timeout, + ) + if r.status_code not in ( + requests.codes.ok, requests.codes.no_content): + # We had a problem + status_str = \ + NotifyBase.http_response_code_lookup(r.status_code) + + self.logger.warning( + 'Failed to send {} Vapid notification: ' + '{}{}error={}.'.format( + self.mode, + status_str, + ', ' if status_str else '', + r.status_code)) + + self.logger.debug( + 'Response Details:\r\n%s', r.content) + + has_error = True + + else: + self.logger.info('Sent %s Vapid notification.', self.mode) + + except requests.RequestException as e: + self.logger.warning( + 'A Connection error occurred sending Vapid ' + 'notification.' + ) + self.logger.debug('Socket Exception: %s', str(e)) + + has_error = True + + return not has_error + + @property + def url_identifier(self): + """ + Returns all of the identifiers that make this URL unique from + another simliar one. Targets or end points should never be identified + here. + """ + return (self.secure_protocol, self.mode, self.subscriber) + + def url(self, privacy=False, *args, **kwargs): + """ + Returns the URL built dynamically based on specified arguments. + """ + + # Define any URL parameters + params = { + 'mode': self.mode, + 'ttl': str(self.ttl), + } + + if self.keyfile: + # Include our keyfile if specified + params['keyfile'] = self.keyfile + + if self.subfile: + # Include our subfile if specified + params['subfile'] = self.subfile + + # Extend our parameters + params.update(self.url_parameters(privacy=privacy, *args, **kwargs)) + + targets = self.targets if not ( + self.targets == 1 and + self.targets[0].lower() == self.subscriber.lower()) else [] + return '{schema}://{subscriber}/{targets}?{params}'.format( + schema=self.secure_protocol, + subscriber=NotifyVapid.quote(self.subscriber, safe='@'), + targets='/'.join(chain( + [str(t) for t in targets], + [NotifyVapid.quote(x, safe='@') + for x in self._invalid_targets])), + params=NotifyVapid.urlencode(params), + ) + + def __len__(self): + """ + Returns the number of targets associated with this notification + """ + targets = len(self.targets) + return targets if targets else 1 + + @staticmethod + def parse_url(url): + """ + Parses the URL and returns enough arguments that can allow + us to re-instantiate this object. + + """ + results = NotifyBase.parse_url(url, verify_host=False) + if not results: + # We're done early as we couldn't load the results + return results + + # Prepare our targets + results['targets'] = [] + if 'from' in results['qsd'] and len(results['qsd']['from']): + results['subscriber'] = \ + NotifyVapid.unquote(results['qsd']['from']) + + if results['user'] and results['host']: + # whatever is left on the URL goes + results['targets'].append('{}@{}'.format( + NotifyVapid.unquote(results['user']), + NotifyVapid.unquote(results['host']), + )) + + elif results['host']: + results['targets'].append( + NotifyVapid.unquote(results['host'])) + + else: + # Acquire our subscriber information + results['subscriber'] = '{}@{}'.format( + NotifyVapid.unquote(results['user']), + NotifyVapid.unquote(results['host']), + ) + + results['targets'].extend( + NotifyVapid.split_path(results['fullpath'])) + + # Get our mode + results['mode'] = results['qsd'].get('mode') + + # Get Image Flag + results['include_image'] = \ + parse_bool(results['qsd'].get( + 'image', NotifyVapid.template_args['image']['default'])) + + # The 'to' makes it easier to use yaml configuration + if 'to' in results['qsd'] and len(results['qsd']['to']): + results['targets'] += \ + NotifyVapid.parse_list(results['qsd']['to']) + + # Our Private Keyfile (PEM) + if 'keyfile' in results['qsd'] and results['qsd']['keyfile']: + results['keyfile'] = \ + NotifyVapid.unquote(results['qsd']['keyfile']) + + # Our Subscription File (JSON) + if 'subfile' in results['qsd'] and results['qsd']['subfile']: + results['subfile'] = \ + NotifyVapid.unquote(results['qsd']['subfile']) + + # Support the 'ttl' variable + if 'ttl' in results['qsd'] and len(results['qsd']['ttl']): + results['ttl'] = \ + NotifyVapid.unquote(results['qsd']['ttl']) + + return results + + @property + def jwt_token(self): + """ + Returns our VAPID Token based on class details + """ + # JWT header + header = { + "alg": "ES256", + "typ": "JWT" + } + + # JWT payload + payload = { + "aud": VAPID_API_LOOKUP[self.mode], + "exp": int(time.time()) + self.vapid_jwt_expiration_sec, + "sub": f"mailto:{self.subscriber}" + } + + # Base64 URL encode header and payload + header_b64 = base64_urlencode( + dumps(header, separators=(",", ":")).encode('utf-8')) + payload_b64 = base64_urlencode( + dumps(payload, separators=(",", ":")).encode('utf-8')) + signing_input = f"{header_b64}.{payload_b64}".encode('utf-8') + signature_b64 = base64_urlencode(self.pem.sign(signing_input)) + + # Return final token + return f"{header_b64}.{payload_b64}.{signature_b64}" + + @property + def public_key(self): + """ + Returns our public key representation + """ + return self.pem.x962_str diff --git a/apprise/plugins/vapid/subscription.py b/apprise/plugins/vapid/subscription.py new file mode 100644 index 00000000..50fe49ff --- /dev/null +++ b/apprise/plugins/vapid/subscription.py @@ -0,0 +1,414 @@ +# -*- coding: utf-8 -*- +# BSD 2-Clause License +# +# Apprise - Push Notification Library. +# Copyright (c) 2025, Chris Caron +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import json +from ...asset import AppriseAsset +from ...utils.base64 import base64_urldecode +from ...exception import AppriseInvalidData +from ...apprise_attachment import AppriseAttachment +from cryptography.hazmat.primitives.asymmetric import ec + + +class WebPushSubscription: + """ + WebPush Subscription + """ + # Format: + # { + # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...", + # "keys": { + # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...", + # "auth": "k9Xzm43nBGo=", + # } + # } + def __init__(self, content: str | dict | None = None): + """ + Prepares a webpush object provided with content + Content can be a dictionary, or JSON String + """ + + # Our variables + self.__endpoint = None + self.__p256dh = None + self.__auth = None + self.__auth_secret = None + self.__public_key = None + + if content is not None: + if not self.load(content): + raise AppriseInvalidData('Could not load subscription') + + def load(self, content: str | dict | None = None): + """ + Performs the loading/validation of the object + """ + + # Reset our variables + self.__endpoint = None + self.__p256dh = None + self.__auth = None + self.__auth_secret = None + self.__public_key = None + + if isinstance(content, str): + try: + content = json.loads(content) + + except json.decoder.JSONDecodeError: + # Bad data + return False + + if not isinstance(content, dict): + # We could not load he result set + return False + + # Retreive our contents for validation + endpoint = content.get('endpoint') + if not isinstance(endpoint, str): + return False + + try: + p256dh = base64_urldecode(content['keys']['p256dh']) + if not p256dh: + return False + + auth_secret = base64_urldecode(content['keys']['auth']) + if not auth_secret: + return False + + except KeyError: + return False + + try: + # Store our data + self.__public_key = ec.EllipticCurvePublicKey.from_encoded_point( + ec.SECP256R1(), p256dh, + ) + + except ValueError: + # Invalid p256dh key (Can't load Public Key) + return False + + self.__endpoint = endpoint + self.__p256dh = content['keys']['p256dh'] + self.__auth = content['keys']['auth'] + self.__auth_secret = auth_secret + + return True + + def write(self, path: str, indent: int = 2) -> bool: + """ + Writes content to disk based on path specified. Content is a JSON + file, so ideally you may wish to have `.json' as it's extension for + clarity + """ + if not self.__public_key: + return False + + try: + with open(path, 'w', encoding='utf-8') as f: + json.dump(self.dict, f, indent=indent) + + except (TypeError, OSError): + # Could not write content + return False + + return True + + @property + def auth(self) -> str | None: + return self.__auth if self.__public_key else None + + @property + def endpoint(self) -> str | None: + return self.__endpoint if self.__public_key else None + + @property + def p256dh(self) -> str | None: + return self.__p256dh if self.__public_key else None + + @property + def auth_secret(self) -> bytes | None: + return self.__auth_secret if self.__public_key else None + + @property + def public_key(self) -> ec.EllipticCurvePublicKey | None: + return self.__public_key + + @property + def dict(self) -> dict: + return { + "endpoint": self.__endpoint, + "keys": { + "p256dh": self.__p256dh, + "auth": self.__auth, + }, + } if self.__public_key else { + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123...', + "keys": { + "p256dh": '', + "auth": '', + }, + } + + def json(self, indent: int = 2) -> str: + """ + Returns JSON representation of the object + """ + return json.dumps(self.dict, indent=indent) + + def __bool__(self) -> bool: + """ + handle 'if' statement + """ + return True if self.__public_key else False + + def __str__(self) -> str: + """ + Returns our JSON entry as a string + """ + # Return the first 16 characters of the detected endpoint subscription + # id + return '' if not self.__endpoint \ + else self.__endpoint.split('/')[-1][:16] + + +class WebPushSubscriptionManager: + """ + WebPush Subscription Manager + """ + # Format: + # { + # "name1": { + # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...", + # "keys": { + # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...", + # "auth": "k9Xzm43nBGo=", + # } + # }, + # "name2": { + # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...", + # "keys": { + # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...", + # "auth": "k9Xzm43nBGo=", + # } + # }, + + # Defines the number of failures we can accept before we abort and assume + # the file is bad + max_load_failure_count = 3 + + def __init__(self, asset=None): + """ + Webpush Subscription Manager + """ + + # Our subscriptions + self.__subscriptions = {} + + # Prepare our Asset Object + self.asset = \ + asset if isinstance(asset, AppriseAsset) else AppriseAsset() + + def __getitem__(self, key: str) -> int: + """ + Returns our indexed value if it exists + """ + return self.__subscriptions[key.lower()] + + def __setitem__(self, name: str, + subscription: WebPushSubscription | str | dict) -> None: + """ + Set's our object if possible + """ + + if not self.add(subscription, name=name.lower()): + raise AppriseInvalidData('Invalid subscription provided') + + def add(self, + subscription: WebPushSubscription | str | dict, + name: str | None = None) -> bool: + """ + Add a subscription into our manager + """ + + if not isinstance(subscription, WebPushSubscription): + try: + # Support loading our object + subscription = WebPushSubscription(subscription) + + except AppriseInvalidData: + return True + + if name is None: + name = str(subscription) + + self.__subscriptions[name.lower()] = subscription + return True + + def __bool__(self) -> bool: + """ + True is returned if at least one subscription has been loaded. + """ + return True if self.__subscriptions else False + + def __len__(self) -> int: + """ + Returns the number of servers loaded; this includes those found within + loaded configuration. This funtion nnever actually counts the + Config entry themselves (if they exist), only what they contain. + """ + return len(self.__subscriptions) + + def __iadd__(self, subscription: WebPushSubscription | + str | dict) -> "WebPushSubscriptionManager": + + if not self.add(subscription): + raise AppriseInvalidData('Invalid subscription provided') + + return self + + def __contains__(self, key: str) -> bool: + """ + Checks if the key exists + """ + return key.lower() in self.__subscriptions + + def clear(self) -> None: + """ + Empties our server list + + """ + self.__subscriptions.clear() + + @property + def dict(self) -> dict: + """ + Returns a dictionary of all entries + """ + return {k: v.dict for k, v in self.__subscriptions.items()} \ + if self.__subscriptions else {} + + def load(self, path: str, byte_limit=0) -> bool: + """ + Writes content to disk based on path specified. Content is a JSON + file, so ideally you may wish to have `.json' as it's extension for + clarity + + if byte_limit is zero, then we do not limit our file size, otherwise + set this to the bytes you want to restrict yourself by + """ + + # Reset our object + self.clear() + + # Create our attachment object + attach = AppriseAttachment(asset=self.asset) + + # Add our path + attach.add(path) + + if byte_limit > 0: + # Enforce maximum file size + attach[0].max_file_size = byte_limit + + if not path: + return False + + try: + # Otherwise open our path + with open(attach[0].path, 'r', encoding='utf-8') as f: + content = json.load(f) + + except (TypeError, OSError): + # Could not read + return False + + # Verify if we're dealing with a single element: + # { + # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...", + # "keys": { + # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...", + # "auth": "k9Xzm43nBGo=", + # } + # } + # + # or if we're dealing with a multiple set + # + # { + # "name1": { + # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...", + # "keys": { + # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...", + # "auth": "k9Xzm43nBGo=", + # } + # }, + # "name2": { + # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...", + # "keys": { + # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...", + # "auth": "k9Xzm43nBGo=", + # } + # }, + + error_count = 0 + if 'endpoint' in content and 'keys' in content: + if not self.add(content): + return False + + else: + for name, subscription in content.items(): + if not self.add(subscription, name=name.lower()): + error_count += 1 + if error_count > self.max_load_failure_count: + self.clear() + return False + + return True + + def write(self, path: str, indent: int = 2) -> bool: + """ + Writes content to disk based on path specified. Content is a JSON + file, so ideally you may wish to have `.json' as it's extension for + clarity + """ + try: + with open(path, 'w', encoding='utf-8') as f: + json.dump(self.dict, f, indent=indent) + + except (TypeError, OSError): + # Could not write content + return False + + return True + + def json(self, indent: int = 2) -> str: + """ + Returns JSON representation of the object + """ + return json.dumps(self.dict, indent=indent) diff --git a/apprise/utils/base64.py b/apprise/utils/base64.py index e5a8a568..33bb4934 100644 --- a/apprise/utils/base64.py +++ b/apprise/utils/base64.py @@ -32,6 +32,33 @@ import typing import base64 +def base64_urlencode(data: bytes) -> str: + """ + URL Safe Base64 Encoding + """ + try: + return base64.urlsafe_b64encode(data).rstrip(b'=').decode('utf-8') + + except TypeError: + # data is not supported; avoid raising exception + return None + + +def base64_urldecode(data: str) -> bytes: + """ + URL Safe Base64 Encoding + """ + + try: + # Normalize base64url string (remove padding, add it back) + padding = '=' * (-len(data) % 4) + return base64.urlsafe_b64decode(data + padding) + + except TypeError: + # data is not supported; avoid raising exception + return None + + def decode_b64_dict(di: dict) -> dict: """ decodes base64 dictionary previously encoded diff --git a/apprise/utils/pem.py b/apprise/utils/pem.py new file mode 100644 index 00000000..2c74f4e2 --- /dev/null +++ b/apprise/utils/pem.py @@ -0,0 +1,740 @@ +# -*- coding: utf-8 -*- +# BSD 2-Clause License +# +# Apprise - Push Notification Library. +# Copyright (c) 2025, Chris Caron +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import json +import base64 +import struct +from ..utils.base64 import base64_urlencode, base64_urldecode +from ..apprise_attachment import AppriseAttachment +from ..asset import AppriseAsset +from ..logger import logger +from ..exception import ApprisePluginException + +try: + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives.kdf.hkdf import HKDF + from cryptography.hazmat.primitives import serialization, hashes + from cryptography.hazmat.primitives.ciphers import ( + Cipher, algorithms, modes) + from cryptography.hazmat.primitives.ciphers.aead import AESGCM + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.serialization import ( + Encoding, + NoEncryption, + PrivateFormat, + PublicFormat, + ) + from cryptography.hazmat.primitives.asymmetric.utils import ( + decode_dss_signature + ) + + # PEM Support enabled + PEM_SUPPORT = True + +except ImportError: + # PEM Support disabled + PEM_SUPPORT = False + + +class ApprisePEMException(ApprisePluginException): + """ + Thrown when there is an error with the PEM Controller + """ + def __init__(self, message, error_code=612): + super().__init__(message, error_code=error_code) + + +class ApprisePEMController: + """ + PEM Controller Tool for the Apprise Library + """ + + # There is no reason a PEM Public Key should exceed 8K in size + # If it is more than this, then it is not accepted + max_pem_public_key_size = 8000 + + # There is no reason a PEM Private Key should exceed 8K in size + # If it is more than this, then it is not accepted + max_pem_private_key_size = 8000 + + # Maximum Vapid Message Size + max_webpush_record_size = 4096 + + def __init__(self, path, pub_keyfile=None, prv_keyfile=None, + name=None, asset=None, **kwargs): + """ + Path should be the directory keys can be written and read from such as + .store.path + + Optionally additionally specify a keyfile to explicitly open + """ + + # Directory we can work with + self.path = path + + # Prepare our Key Placeholders + self.__private_key = None + self.__public_key = None + + # Our name (id) + self.name = name.strip(' \t/-+!$@#*').lower() \ + if isinstance(name, str) else '' + + # Prepare our Asset Object + self.asset = \ + asset if isinstance(asset, AppriseAsset) else AppriseAsset() + + # Our temporary reference points + self._prv_keyfile = AppriseAttachment(asset=self.asset) + self._pub_keyfile = AppriseAttachment(asset=self.asset) + + if prv_keyfile: + self.load_private_key(prv_keyfile) + + elif pub_keyfile: + self.load_public_key(pub_keyfile) + + else: + self._pub_keyfile = None + + def load_private_key(self, path=None, *names): + """ + Load Private key and from that we can prepare our public key + """ + + if path is None: + # Auto-load our content + return True if self.private_keyfile(*names) else False + + # Create ourselves an Attachment to work with; this grants us the + # ability to pull this key from a remote site or anything else + # supported by the Attachment object + self._prv_keyfile = AppriseAttachment(asset=self.asset) + + # Add our definition to our pem_key reference + self._prv_keyfile.add(path) + + # Enforce maximum file size + self._prv_keyfile[0].max_file_size = self.max_pem_private_key_size + + # + # Reset Public key + # + self._pub_keyfile = AppriseAttachment(asset=self.asset) + + # + # Reset our internal keys + # + self.__private_key = None + self.__public_key = None + + if not self._prv_keyfile: + # Early exit + logger.error( + 'Could not access PEM Private Key {}.'.format(path)) + return False + + try: + with open(self._prv_keyfile[0].path, "rb") as f: + self.__private_key = serialization.load_pem_private_key( + f.read(), + password=None, # or provide the password if encrypted + backend=default_backend() + ) + + except FileNotFoundError: + # Generate keys + logger.debug('PEM Private Key file not found: %s', path) + return False + + except OSError as e: + logger.warning('Error accessing PEM Private Key file %s', path) + logger.debug(f'I/O Exception: {e}') + return False + + # + # Generate our public key + # + self.__public_key = self.__private_key.public_key() + + # Load our private key + return True if self.__private_key else False + + def load_public_key(self, path=None, *names): + """ + Load Public key only + + Note: with just a public key you can only decrypt, encryption is not + possible. + """ + + if path is None: + # Auto-load our content + return True if self.public_keyfile(*names) else False + + # Create ourselves an Attachment to work with; this grants us the + # ability to pull this key from a remote site or anything else + # supported by the Attachment object + self._pub_keyfile = AppriseAttachment(asset=self.asset) + + # Add our definition to our pem_key reference + self._pub_keyfile.add(path) + + # Enforce maximum file size + self._pub_keyfile[0].max_file_size = self.max_pem_public_key_size + + # + # Reset Private key + # + self._prv_keyfile = AppriseAttachment(asset=self.asset) + + # + # Reset our internal keys + # + self.__private_key = None + self.__public_key = None + + if not self._pub_keyfile: + # Early exit + logger.error( + 'Could not access PEM Public Key {}.'.format(path)) + return False + + try: + with open(path, 'rb') as key_file: + self.__public_key = serialization.load_pem_public_key( + key_file.read(), + backend=default_backend() + ) + + except FileNotFoundError: + # Generate keys + logger.debug('PEM Public Key file not found: %s', path) + return None + + except OSError as e: + logger.warning('Error accessing PEM Public Key file %s', path) + logger.debug(f'I/O Exception: {e}') + return None + + # Load our private key + return True if self.__public_key else False + + def keygen(self, name=None, force=False): + """ + Generates a set of keys based on name configured. + """ + + if self._pub_keyfile or not self.path: + logger.trace( + 'PEM keygen disabled, reason=%s', + 'keyfile-defined' if not self._pub_keyfile + else 'no-write-path') + return False + + # Create a new private/public key pair + self.__private_key = ec.generate_private_key( + ec.SECP256R1(), default_backend()) + self.__public_key = self.__private_key.public_key() + + # + # Prepare our PEM formatted output files + # + private_key = self.__private_key.private_bytes( + Encoding.PEM, + PrivateFormat.PKCS8, + encryption_algorithm=NoEncryption(), + ) + + public_key = self.__public_key.public_bytes( + encoding=Encoding.PEM, + format=PublicFormat.SubjectPublicKeyInfo, + ) + + if not name: + name = self.name + + file_prefix = '' if not name else f'{name}-' + pub_path = os.path.join(self.path, f'{file_prefix}public_key.pem') + prv_path = os.path.join(self.path, f'{file_prefix}private_key.pem') + + if os.path.isfile(pub_path) and not force: + logger.debug( + 'PEM generation skipped; Public Key already exists: %s', + pub_path) + return True + + try: + # Write our keys to disk + with open(pub_path, 'wb') as f: + f.write(public_key) + + except OSError as e: + logger.warning('Error writing Public PEM file %s', pub_path) + logger.debug(f'I/O Exception: {e}') + + # Cleanup + try: + os.unlink(pub_path) + logger.trace('Removed %s', pub_path) + + except OSError: + pass + + try: + with open(prv_path, 'wb') as f: + f.write(private_key) + + except OSError as e: + logger.warning('Error writing Private PEM file %s', prv_path) + logger.debug(f'I/O Exception: {e}') + + try: + os.unlink(pub_path) + logger.trace('Removed %s', pub_path) + + except OSError: + pass + + try: + os.unlink(prv_path) + logger.trace('Removed %s', prv_path) + + except OSError: + pass + + return False + + logger.info( + 'Wrote Public/Private PEM key pair for %s/%s', + os.path.dirname(pub_path), + os.path.basename(pub_path)) + return True + + def public_keyfile(self, *names): + """ + Returns the first match of a useable public key based names provided + """ + + if not PEM_SUPPORT: + msg = 'PEM Support unavailable; install cryptography library' + logger.warning(msg) + raise ApprisePEMException(msg) + + if self._pub_keyfile: + # If our code reaches here, then we fetch our public key + pem_key = self._pub_keyfile[0] + if not pem_key: + # We could not access the attachment + logger.error( + 'Could not access PEM Public Key {}.'.format( + pem_key.url(privacy=True))) + return False + + return pem_key.path + + elif not self.path: + # No path + return None + + fnames = [ + 'public_key.pem', + 'public.pem', + 'pub.pem', + ] + + if self.name: + # Include our name in the list + fnames = [self.name] + [*names] + + for name in names: + fnames.insert(0, f'{name}-public_key.pem') + + _entry = name.lower() + fnames.insert(0, f'{_entry}-public_key.pem') + + return next( + (os.path.join(self.path, fname) + for fname in fnames + if os.path.isfile(os.path.join(self.path, fname))), + None) + + def private_keyfile(self, *names): + """ + Returns the first match of a useable private key based names provided + """ + + if not PEM_SUPPORT: + msg = 'PEM Support unavailable; install cryptography library' + logger.warning(msg) + raise ApprisePEMException(msg) + + if self._prv_keyfile: + # If our code reaches here, then we fetch our private key + pem_key = self._prv_keyfile[0] + if not pem_key: + # We could not access the attachment + logger.error( + 'Could not access PEM Private Key {}.'.format( + pem_key.url(privacy=True))) + return False + + return pem_key.path + + elif not self.path: + # No path + return None + + fnames = [ + 'private_key.pem', + 'private.pem', + 'prv.pem', + ] + + if self.name: + # Include our name in the list + fnames = [self.name] + [*names] + + for name in names: + fnames.insert(0, f'{name}-private_key.pem') + + _entry = name.lower() + fnames.insert(0, f'{_entry}-private_key.pem') + + return next( + (os.path.join(self.path, fname) + for fname in fnames + if os.path.isfile(os.path.join(self.path, fname))), + None) + + def public_key(self, *names, autogen=None): + """ + Opens a spcified pem public file and returns the key from it which + is used to decrypt the message + """ + if self.__public_key: + return self.__public_key + + path = self.public_keyfile(*names) + if not path: + if (autogen if autogen is not None else self.asset.pem_autogen) \ + and self.keygen(*names): + path = self.public_keyfile(*names) + if path: + # We should get a hit now + return self.public_key(*names) + + logger.warning('No PEM Public Key could be loaded') + return None + + return self.__public_key if ( + self.load_public_key(path) or + # Try to see if we can load a private key (which we can generate a + # public from) + self.private_key(names=names, autogen=autogen)) else None + + def private_key(self, *names, autogen=None): + """ + Opens a spcified pem private file and returns the key from it which + is used to encrypt the message + """ + if self.__private_key: + return self.__private_key + + path = self.private_keyfile(*names) + if not path: + if (autogen if autogen is not None else self.asset.pem_autogen) \ + and self.keygen(*names): + path = self.private_keyfile(*names) + if path: + # We should get a hit now + return self.private_key(*names) + + logger.warning('No PEM Private Key could be loaded') + return None + + return self.__private_key if self.load_private_key(path) else None + + def encrypt_webpush(self, message: str | bytes, + # Information required + public_key: ec.EllipticCurvePublicKey, + auth_secret: bytes) -> bytes: + """ + Encrypt a WebPush message using the recipient's public key and auth + secret. + + Accepts input message as str or bytes. + """ + if isinstance(message, str): + message = message.encode('utf-8') + + # 1. Generate ephemeral EC private/Public key + ephemeral_private_key = \ + ec.generate_private_key(ec.SECP256R1(), default_backend()) + ephemeral_public_key = ephemeral_private_key.public_key().public_bytes( + encoding=Encoding.X962, format=PublicFormat.UncompressedPoint) + + # 2. Random salt + salt = os.urandom(16) + + # 3. Generate shared secret via ECDH + shared_secret = ephemeral_private_key.exchange(ec.ECDH(), public_key) + + # 4. Derive PRK using HKDF (first phase) + recipient_public_key_bytes = public_key.public_bytes( + encoding=Encoding.X962, + format=PublicFormat.UncompressedPoint, + ) + + # 5. Derive Encryption key + hkdf_secret = HKDF( + algorithm=hashes.SHA256(), + length=32, + salt=auth_secret, + info=b"WebPush: info\x00" + + recipient_public_key_bytes + ephemeral_public_key, + backend=default_backend(), + ).derive(shared_secret) + + # 6. Derive Content Encryption Key + hkdf_key = HKDF( + algorithm=hashes.SHA256(), + length=16, + salt=salt, + info=b"Content-Encoding: aes128gcm\x00", + backend=default_backend(), + ).derive(hkdf_secret) + + # 7. Derive Nonce + hkdf_nonce = HKDF( + algorithm=hashes.SHA256(), + length=12, + salt=salt, + info=b"Content-Encoding: nonce\x00", + backend=default_backend(), + ).derive(hkdf_secret) + + # 8. Encrypt the message + aesgcm = AESGCM(hkdf_key) + # RFC8291 requires us to add '\0x02' byte to end of message + ciphertext = aesgcm.encrypt( + hkdf_nonce, message + b"\x02", associated_data=None) + + # 9. Build WebPush header + payload + header = salt + header += struct.pack("!L", self.max_webpush_record_size) + header += struct.pack("!B", len(ephemeral_public_key)) + header += ephemeral_public_key + header += ciphertext + + return header + + def encrypt(self, + message: str | bytes, + public_key: ec.EllipticCurvePublicKey | None = None, + salt: bytes | None = None) -> str | None: + """ + Encrypts a message using the recipient's public key (or self public + key if none provided). Message can be str or bytes. + """ + + # 1. Handle string vs bytes input + if isinstance(message, str): + message = message.encode('utf-8') + + # 2. Select public key + if public_key is None: + public_key = self.public_key() + if public_key is None: + logger.debug("No public key available for encryption.") + return None + + # 3. Generate ephemeral EC private key + ephemeral_private_key = ec.generate_private_key(ec.SECP256R1()) + + # 4. Derive shared secret + shared_secret = ephemeral_private_key.exchange(ec.ECDH(), public_key) + + # 5. Derive symmetric AES key using HKDF + derived_key = HKDF( + algorithm=hashes.SHA256(), + length=32, + salt=salt, # Allow salt=None if not provided + info=b'ecies-encryption', + backend=default_backend() + ).derive(shared_secret) + + # 6. Encrypt the message using AES-GCM + iv = os.urandom(12) # 96-bit random IV for GCM + encryptor = Cipher( + algorithms.AES(derived_key), + modes.GCM(iv), + backend=default_backend() + ).encryptor() + + ciphertext = encryptor.update(message) + encryptor.finalize() + tag = encryptor.tag + + # 7. Serialize ephemeral public key as X9.62 Uncompressed Point + ephemeral_public_key_bytes = \ + ephemeral_private_key.public_key().public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.UncompressedPoint, + ) + + # 8. Combine everything cleanly + full_payload = { + "ephemeral_pubkey": base64_urlencode(ephemeral_public_key_bytes), + "iv": base64_urlencode(iv), + "tag": base64_urlencode(tag), + "ciphertext": base64_urlencode(ciphertext), + } + + return base64.b64encode( + json.dumps(full_payload).encode('utf-8') + ).decode('utf-8') + + def decrypt(self, + encrypted_payload: str, + private_key: ec.EllipticCurvePrivateKey | None = None, + salt: bytes | None = None) -> str | None: + """ + Decrypts a message using the provided private key or fallback to + self's private key. + + Payload is the base64-encoded JSON from encrypt(). + """ + + # 1. Parse input + if isinstance(encrypted_payload, str): + payload_bytes = base64.b64decode(encrypted_payload.encode('utf-8')) + else: + payload_bytes = base64.b64decode(encrypted_payload) + + payload = json.loads(payload_bytes.decode('utf-8')) + + ephemeral_pubkey_bytes = base64_urldecode(payload["ephemeral_pubkey"]) + iv = base64_urldecode(payload["iv"]) + tag = base64_urldecode(payload["tag"]) + ciphertext = base64_urldecode(payload["ciphertext"]) + + # 2. Select private key + if private_key is None: + private_key = self.private_key() + if private_key is None: + logger.debug("No private key available for decryption.") + return None + + # 3. Load ephemeral public key from sender + ephemeral_pubkey = ec.EllipticCurvePublicKey.from_encoded_point( + ec.SECP256R1(), + ephemeral_pubkey_bytes + ) + + # 4. ECDH shared secret + shared_secret = private_key.exchange(ec.ECDH(), ephemeral_pubkey) + + # 5. Derive symmetric AES key with HKDF + derived_key = HKDF( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + info=b'ecies-encryption', + ).derive(shared_secret) + + # 6. Decrypt using AES-GCM + decryptor = Cipher( + algorithms.AES(derived_key), + modes.GCM(iv, tag), + ).decryptor() + + plaintext = decryptor.update(ciphertext) + decryptor.finalize() + + # 7. Return decoded message + return plaintext.decode('utf-8') + + def sign(self, content): + """ + Sign the message using ES256 (ECDSA w/ SHA256) via private key + """ + + try: + # Sign the message using ES256 (ECDSA w/ SHA256) + der_sig = self.private_key()\ + .sign(content, ec.ECDSA(hashes.SHA256())) + + except AttributeError: + # NoneType; could not load key + return None + + # Convert DER to raw R||S + r, s = decode_dss_signature(der_sig) + return r.to_bytes( + 32, byteorder='big') + s.to_bytes(32, byteorder='big') + + @property + def pub_keyfile(self): + """ + Returns the Public Keyfile Path if set otherwise it returns None + This property returns False if a keyfile was provided, but was invalid + """ + return None if not self._pub_keyfile \ + else (False if not self._pub_keyfile[0] + else self._pub_keyfile[0].path) + + @property + def prv_keyfile(self): + """ + Returns the Privat Keyfile Path if set otherwise it returns None + This property returns False if a keyfile was provided, but was invalid + """ + return None if not self._prv_keyfile \ + else (False if not self._prv_keyfile[0] + else self._prv_keyfile[0].path) + + @property + def x962_str(self): + """ + X962 serialization based on public key + """ + try: + return base64_urlencode( + self.public_key().public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.UncompressedPoint) + ) + except AttributeError: + # Public Key could not be generated (public_key() returned None) + return '' + + def __bool__(self): + """ + Returns True if at least 1 key was loaded + """ + return True if (self.public_key() or self.private_key()) else False diff --git a/test/test_apprise_utils.py b/test/test_apprise_utils.py index 29649d18..28491184 100644 --- a/test/test_apprise_utils.py +++ b/test/test_apprise_utils.py @@ -2752,6 +2752,30 @@ def test_cwe312_url(): '#random') == 'slack://test@B...4/J...M/X...3/' +def test_base64_encode_decode(): + """ + Utils:Base64:URLEncode & Decode + + """ + assert utils.base64.base64_urlencode(None) is None + assert utils.base64.base64_urlencode(42) is None + assert utils.base64.base64_urlencode(object) is None + assert utils.base64.base64_urlencode({}) is None + assert utils.base64.base64_urlencode("") is None + assert utils.base64.base64_urlencode("abc") is None + assert utils.base64.base64_urlencode(b"") == '' + assert utils.base64.base64_urlencode(b"abc") == 'YWJj' + + assert utils.base64.base64_urldecode(None) is None + assert utils.base64.base64_urldecode(42) is None + assert utils.base64.base64_urldecode(object) is None + assert utils.base64.base64_urldecode({}) is None + + assert utils.base64.base64_urldecode("abc") == b'i\xb7' + assert utils.base64.base64_urldecode("") == b'' + assert utils.base64.base64_urldecode('YWJj') == b'abc' + + def test_dict_base64_codec(tmpdir): """ Test encoding/decoding of base64 content diff --git a/test/test_plugin_vapid.py b/test/test_plugin_vapid.py new file mode 100644 index 00000000..f5eca671 --- /dev/null +++ b/test/test_plugin_vapid.py @@ -0,0 +1,390 @@ +# -*- coding: utf-8 -*- +# BSD 2-Clause License +# +# Apprise - Push Notification Library. +# Copyright (c) 2025, Chris Caron +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import json +import requests +import pytest +from unittest import mock + +from apprise.plugins.vapid.subscription import ( + WebPushSubscription, WebPushSubscriptionManager) +from apprise.plugins.vapid import NotifyVapid +from apprise import exception, asset, url +from apprise.common import PersistentStoreMode +from apprise.utils.pem import ApprisePEMController +from helpers import AppriseURLTester + +# Disable logging for a cleaner testing output +import logging +logging.disable(logging.CRITICAL) + +# Attachment Directory +TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var') + +# a test UUID we can use +SUBSCRIBER = 'user@example.com' + +PLUGIN_ID = 'vapid' + +# Our Testing URLs +apprise_url_tests = ( + ('vapid://', { + 'instance': TypeError, + }), + ('vapid://:@/', { + 'instance': TypeError, + }), + ('vapid://invalid-subscriber', { + # An invalid Subscriber + 'instance': TypeError, + }), + ('vapid://user@example.com', { + # bare bone requirements met, but we don't have our subscription file + # or our private key (pem) + 'instance': NotifyVapid, + # We'll fail to respond because we would not have found any + # configuration to load + 'notify_response': False, + }), + ('vapid://user@example.com/newuser@example.com', { + # we don't have our subscription file or private key + 'instance': NotifyVapid, + 'notify_response': False, + }), + ('vapid://user@example.ca/newuser@example.ca', { + 'instance': NotifyVapid, + # force a failure + 'response': False, + 'requests_response_code': requests.codes.internal_server_error, + }), + ('vapid://user@example.uk/newuser@example.uk', { + 'instance': NotifyVapid, + # throw a bizzare code forcing us to fail to look it up + 'response': False, + 'requests_response_code': 999, + }), + ('vapid://user@example.au/newuser@example.au', { + 'instance': NotifyVapid, + # Throws a series of connection and transfer exceptions when this flag + # is set and tests that we gracfully handle them + 'test_requests_exceptions': True, + }), +) + + +@pytest.fixture +def patch_persistent_store_namespace(tmpdir): + """ + Force an easy to test environment + """ + with mock.patch.object(url.URLBase, 'url_id', return_value=PLUGIN_ID), \ + mock.patch.object( + asset.AppriseAsset, 'storage_mode', + PersistentStoreMode.AUTO), \ + mock.patch.object( + asset.AppriseAsset, 'storage_path', str(tmpdir)): + + tmp_dir = tmpdir.mkdir(PLUGIN_ID) + # Return the directory name + yield str(tmp_dir) + + +@pytest.fixture +def subscription_reference(): + return { + "user@example.com": { + "endpoint": 'https://fcm.googleapis.com/fcm/send/default', + "keys": { + "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0', + "auth": 'k9Xzm43nBGo=', + }, + }, + "user1": { + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0', + "auth": 'k9Xzm43nBGo=', + }, + }, + "user2": { + "endpoint": 'https://fcm.googleapis.com/fcm/send/def456', + "keys": { + "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0', + "auth": 'k9Xzm43nBGo=', + }, + }, + } + + +def test_plugin_vapid_urls(): + """ + NotifyVapid() Apprise URLs - No Config + + """ + + # Run our general tests + AppriseURLTester(tests=apprise_url_tests).run_all() + + +def test_plugin_vapid_urls_with_required_assets( + patch_persistent_store_namespace, subscription_reference): + """ + NotifyVapid() Apprise URLs With Config + """ + + # Determine our store + pc = ApprisePEMController(path=patch_persistent_store_namespace) + assert pc.keygen() is True + + # Write our subscriptions file to disk + subscription_file = os.path.join( + patch_persistent_store_namespace, + NotifyVapid.vapid_subscription_file) + + with open(subscription_file, 'w') as f: + f.write(json.dumps(subscription_reference)) + + tests = ( + ('vapid://user@example.com', { + # user@example.com loaded (also used as subscriber id) + 'instance': NotifyVapid, + }), + ('vapid://user@example.com/newuser@example.com', { + # no newuser@example.com key entry + 'instance': NotifyVapid, + 'notify_response': False, + }), + ('vapid://user@example.com/user1?to=user2', { + # We'll succesfully notify 2 users + 'instance': NotifyVapid, + }), + ('vapid://user@example.com/default', { + 'instance': NotifyVapid, + # force a failure + 'response': False, + 'requests_response_code': requests.codes.internal_server_error, + }), + ('vapid://user@example.com/newuser@example.uk', { + 'instance': NotifyVapid, + # throw a bizzare code forcing us to fail to look it up + 'response': False, + 'requests_response_code': 999, + }), + ('vapid://user@example.com/newuser@example.au', { + 'instance': NotifyVapid, + # Throws a series of connection and transfer exceptions + # when this flag is set and tests that we gracfully handle them + 'test_requests_exceptions': True, + }), + ) + + AppriseURLTester(tests=tests).run_all() + + +def test_plugin_vapid_subscriptions(tmpdir): + """ + NotifyVapid() Subscriptions + + """ + + # Temporary directory + tmpdir0 = tmpdir.mkdir('tmp00') + + with pytest.raises(exception.AppriseInvalidData): + # Integer not supported + WebPushSubscription(42) + + with pytest.raises(exception.AppriseInvalidData): + # Not the correct format + WebPushSubscription('bad-content') + + with pytest.raises(exception.AppriseInvalidData): + # Invalid JSON + WebPushSubscription('{') + + with pytest.raises(exception.AppriseInvalidData): + # Empty Dictionary + WebPushSubscription({}) + + with pytest.raises(exception.AppriseInvalidData): + WebPushSubscription({ + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR=', + "auth": 42, + }, + }) + + with pytest.raises(exception.AppriseInvalidData): + WebPushSubscription({ + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 42, + "auth": 'k9Xzm43nBGo=', + }, + }) + + with pytest.raises(exception.AppriseInvalidData): + WebPushSubscription({ + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + }) + + with pytest.raises(exception.AppriseInvalidData): + WebPushSubscription({ + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": {}, + }) + + with pytest.raises(exception.AppriseInvalidData): + # Invalid p256dh public key provided + wps = WebPushSubscription({ + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR=', + "auth": 'k9Xzm43nBGo=', + }, + }) + + # An empty object + wps = WebPushSubscription() + assert bool(wps) is False + assert isinstance(wps.json(), str) + assert json.loads(wps.json()) + assert str(wps) == '' + assert wps.auth is None + assert wps.endpoint is None + assert wps.p256dh is None + assert wps.public_key is None + # We can't write anything as there is nothing loaded + assert wps.write(os.path.join(str(tmpdir0), 'subscriptions.json')) is False + + # A valid key + wps = WebPushSubscription({ + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0', + "auth": 'k9Xzm43nBGo=', + }, + }) + + assert bool(wps) is True + assert isinstance(wps.json(), str) + assert json.loads(wps.json()) + assert str(wps) == 'abc123' + assert wps.auth == 'k9Xzm43nBGo=' + assert wps.endpoint == 'https://fcm.googleapis.com/fcm/send/abc123' + assert wps.p256dh == 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' \ + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0' + assert wps.public_key is not None + + # Currently no files here + assert os.listdir(str(tmpdir0)) == [] + + # Bad content + assert wps.write(object) is False + assert wps.write(None) is False + # Can't write to a name already taken by as a directory + assert wps.write(str(tmpdir0)) is False + # Can't write to a name already taken by as a directory + assert wps.write(os.path.join(str(tmpdir0), 'subscriptions.json')) is True + assert os.listdir(str(tmpdir0)) == ['subscriptions.json'] + + +def test_plugin_vapid_subscription_manager(tmpdir): + """ + NotifyVapid() Subscription Manager + + """ + + # Temporary directory + tmpdir0 = tmpdir.mkdir('tmp00') + + smgr = WebPushSubscriptionManager() + + assert bool(smgr) is False + assert len(smgr) == 0 + + sub = { + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0', + "auth": 'k9Xzm43nBGo=', + }, + } + + assert smgr.add(sub) is True + assert bool(smgr) is True + assert len(smgr) == 1 + + # Same sub (overwrites same slot) + smgr += sub + assert bool(smgr) is True + assert len(smgr) == 1 + + # indexed by value added + smgr['abc123'] = sub + assert bool(smgr) is True + assert len(smgr) == 1 + + assert isinstance(smgr['abc123'], WebPushSubscription) + + # Currently no files here + assert os.listdir(str(tmpdir0)) == [] + + # Write our content + assert smgr.write( + os.path.join(str(tmpdir0), 'subscriptions.json')) is True + + assert os.listdir(str(tmpdir0)) == ['subscriptions.json'] + + # Reset our object + smgr.clear() + assert bool(smgr) is False + assert len(smgr) == 0 + + # Load our content back + assert smgr.load( + os.path.join(str(tmpdir0), 'subscriptions.json')) is True + assert bool(smgr) is True + assert len(smgr) == 1 + + # Write over our file using the standard Subscription format + assert smgr['abc123'].write( + os.path.join(str(tmpdir0), 'subscriptions.json')) is True + + # We can still open this type as well + assert smgr.load( + os.path.join(str(tmpdir0), 'subscriptions.json')) is True + assert bool(smgr) is True + assert len(smgr) == 1 diff --git a/test/test_utils_pem.py b/test/test_utils_pem.py new file mode 100644 index 00000000..00888dbd --- /dev/null +++ b/test/test_utils_pem.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +# BSD 2-Clause License +# +# Apprise - Push Notification Library. +# Copyright (c) 2025, Chris Caron +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import logging +import os + +from apprise import AppriseAsset +from apprise import PersistentStoreMode +from apprise import utils + +# Disable logging for a cleaner testing output +logging.disable(logging.CRITICAL) + +# Attachment Directory +TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var') + + +def test_utils_pem_general(tmpdir): + """ + Utils:PEM + + """ + + tmpdir0 = tmpdir.mkdir('tmp00') + + # Currently no files here + assert os.listdir(str(tmpdir0)) == [] + + asset = AppriseAsset( + storage_mode=PersistentStoreMode.MEMORY, + storage_path=str(tmpdir0), + pem_autogen=False, + ) + + # Create a PEM Controller + pem_c = utils.pem.ApprisePEMController(path=None, asset=asset) + + # Nothing to lookup + assert pem_c.public_keyfile() is None + assert pem_c.public_key() is None + assert pem_c.x962_str == '' + assert pem_c.encrypt("message") is None + # Keys can not be generated in memory mode + assert pem_c.keygen() is False + + asset = AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir0), + pem_autogen=False, + ) + + # No new files + assert os.listdir(str(tmpdir0)) == [] + + # Our asset is now write mode, so we will be able to generate a key + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + # Nothing to lookup + assert pem_c.public_keyfile() is None + assert pem_c.public_key() is None + assert pem_c.x962_str == '' + assert pem_c.encrypt("message") is None + + # Keys can not be generated in memory mode + assert pem_c.keygen() is True + + # We have 2 new key files generated + assert os.listdir(str(tmpdir0)) == ['public_key.pem', 'private_key.pem'] + assert pem_c.public_keyfile() is not None + assert pem_c.public_key() is not None + assert len(pem_c.x962_str) > 20 + content = pem_c.encrypt("message") + assert isinstance(content, str) + assert pem_c.decrypt(content) == "message"