diff --git a/.coveragerc b/.coveragerc
index a739da1b..ed09dda5 100644
--- a/.coveragerc
+++ b/.coveragerc
@@ -15,4 +15,3 @@ source =
show_missing = True
skip_covered = True
skip_empty = True
-fail_under = 95.0
diff --git a/KEYWORDS b/KEYWORDS
index 52ca10a2..69a8502d 100644
--- a/KEYWORDS
+++ b/KEYWORDS
@@ -109,9 +109,11 @@ Threema Gateway
Twilio
Twist
Twitter
+Vapid
VictorOps
Voipms
Vonage
+Webpush
Webex
WeCom Bot
WhatsApp
diff --git a/README.md b/README.md
index 6059039e..3d111c97 100644
--- a/README.md
+++ b/README.md
@@ -132,6 +132,7 @@ The table below identifies the services this tool supports and some example serv
| [Telegram](https://github.com/caronc/apprise/wiki/Notify_telegram) | tgram:// | (TCP) 443 | tgram://bottoken/ChatID
tgram://bottoken/ChatID1/ChatID2/ChatIDN
| [Twitter](https://github.com/caronc/apprise/wiki/Notify_twitter) | twitter:// | (TCP) 443 | twitter://CKey/CSecret/AKey/ASecret
twitter://user@CKey/CSecret/AKey/ASecret
twitter://CKey/CSecret/AKey/ASecret/User1/User2/User2
twitter://CKey/CSecret/AKey/ASecret?mode=tweet
| [Twist](https://github.com/caronc/apprise/wiki/Notify_twist) | twist:// | (TCP) 443 | twist://pasword:login
twist://password:login/#channel
twist://password:login/#team:channel
twist://password:login/#team:channel1/channel2/#team3:channel
+| [Vapid (WebPush)](https://github.com/caronc/apprise/wiki/Notify_vapid) | vapid:// | (TCP) 443 | vapid://subscriber/target
vapid://subscriber/target?subfile=path&keyfile=path
| [Webex Teams (Cisco)](https://github.com/caronc/apprise/wiki/Notify_wxteams) | wxteams:// | (TCP) 443 | wxteams://Token
| [WeCom Bot](https://github.com/caronc/apprise/wiki/Notify_wecombot) | wecombot:// | (TCP) 443 | wecombot://BotKey
| [WhatsApp](https://github.com/caronc/apprise/wiki/Notify_whatsapp) | whatsapp:// | (TCP) 443 | whatsapp://AccessToken@FromPhoneID/ToPhoneNo
whatsapp://Template:AccessToken@FromPhoneID/ToPhoneNo
diff --git a/apprise/apprise_attachment.py b/apprise/apprise_attachment.py
index 855b42d4..e5b5d262 100644
--- a/apprise/apprise_attachment.py
+++ b/apprise/apprise_attachment.py
@@ -273,6 +273,17 @@ class AppriseAttachment:
return attach_plugin
+ def sync(self, abort_on_error=True, abort_if_empty=True):
+ """
+ Itereates over all of the attachments and retrieves them
+ """
+ # TODO: Change this to async for future
+
+ return False if abort_if_empty and not self.attachments else (
+ next((False for a in self.attachments if not a), True)
+ if abort_on_error
+ else next((True for a in self.attachments), True))
+
def clear(self):
"""
Empties our attachment list
diff --git a/apprise/asset.py b/apprise/asset.py
index aef90f08..f9399a77 100644
--- a/apprise/asset.py
+++ b/apprise/asset.py
@@ -149,6 +149,12 @@ class AppriseAsset:
# if Persistent Storage was set to `memory`
pgp_autogen = True
+ # Automatically generate our Privacy Enhanced Mail (PEM) keys if one isn't
+ # present and our environment configuration allows for it.
+ # For example, a case where the environment wouldn't allow for it would be
+ # if Persistent Storage was set to `memory`
+ pem_autogen = True
+
# For more detail see CWE-312 @
# https://cwe.mitre.org/data/definitions/312.html
#
diff --git a/apprise/exception.py b/apprise/exception.py
index 216e5cc7..8d7f86eb 100644
--- a/apprise/exception.py
+++ b/apprise/exception.py
@@ -53,6 +53,14 @@ class AppriseDiskIOError(AppriseException):
super().__init__(message, error_code=error_code)
+class AppriseInvalidData(AppriseException):
+ """
+ Thrown when bad data was passed into an internal function
+ """
+ def __init__(self, message, error_code=errno.EINVAL):
+ super().__init__(message, error_code=error_code)
+
+
class AppriseFileNotFound(AppriseDiskIOError, FileNotFoundError):
"""
Thrown when a persistent write occured in MEMORY mode
diff --git a/apprise/plugins/vapid/__init__.py b/apprise/plugins/vapid/__init__.py
new file mode 100644
index 00000000..83d30695
--- /dev/null
+++ b/apprise/plugins/vapid/__init__.py
@@ -0,0 +1,590 @@
+# -*- coding: utf-8 -*-
+# BSD 2-Clause License
+#
+# Apprise - Push Notification Library.
+# Copyright (c) 2025, Chris Caron
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+import os
+import requests
+from itertools import chain
+from json import dumps
+from . import subscription
+from ..base import NotifyBase
+from ...common import NotifyType
+from ...common import NotifyImageSize
+from ...common import PersistentStoreMode
+from ...utils.parse import parse_list, parse_bool, is_email
+from ...utils.base64 import base64_urlencode
+from ...utils import pem as _pem
+from ...locale import gettext_lazy as _
+import time
+
+
+class VapidPushMode:
+ """
+ Supported Vapid Push Services
+ """
+ CHROME = 'chrome'
+ FIREFOX = 'firefox'
+ EDGE = 'edge'
+ OPERA = 'opera'
+ APPLE = 'apple'
+ SAMSUNG = 'samsung'
+ BRAVE = 'brave'
+ GENERIC = 'generic'
+
+
+VAPID_API_LOOKUP = {
+ VapidPushMode.CHROME:
+ 'https://fcm.googleapis.com/fcm/send',
+ VapidPushMode.FIREFOX:
+ 'https://updates.push.services.mozilla.com/wpush/v1',
+ VapidPushMode.EDGE:
+ 'https://fcm.googleapis.com/fcm/send', # Edge uses FCM too
+ VapidPushMode.OPERA:
+ 'https://fcm.googleapis.com/fcm/send', # Opera is Chromium-based
+ VapidPushMode.APPLE:
+ 'https://web.push.apple.com', # Apple Web Push base endpoint
+ VapidPushMode.BRAVE:
+ 'https://fcm.googleapis.com/fcm/send',
+ VapidPushMode.SAMSUNG:
+ 'https://fcm.googleapis.com/fcm/send',
+ VapidPushMode.GENERIC:
+ 'https://fcm.googleapis.com/fcm/send',
+}
+
+VAPID_PUSH_MODES = (
+ VapidPushMode.CHROME,
+ VapidPushMode.FIREFOX,
+ VapidPushMode.EDGE,
+ VapidPushMode.OPERA,
+ VapidPushMode.APPLE,
+)
+
+
+class NotifyVapid(NotifyBase):
+ """
+ A wrapper for WebPush/Vapid notifications
+ """
+ # Set our global enabled flag
+ enabled = subscription.CRYPTOGRAPHY_SUPPORT and _pem.PEM_SUPPORT
+
+ requirements = {
+ # Define our required packaging in order to work
+ 'packages_required': 'cryptography'
+ }
+
+ # The default descriptive name associated with the Notification
+ service_name = 'Vapid Web Push Notifications'
+
+ # The services URL
+ service_url = \
+ 'https://datatracker.ietf.org/doc/html/draft-thomson-webpush-vapid'
+
+ # The default protocol
+ secure_protocol = 'vapid'
+
+ # A URL that takes you to the setup/help of the specific protocol
+ setup_url = 'https://github.com/caronc/apprise/wiki/Notify_vapid'
+
+ # There is no reason we should exceed 5KB when reading in a PEM file.
+ # If it is more than this, then it is not accepted.
+ max_vapid_keyfile_size = 5000
+
+ # There is no reason we should exceed 5MB when reading in a JSON file.
+ # If it is more than this, then it is not accepted.
+ max_vapid_subfile_size = 5242880
+
+ # The maximum length of the messge can be 4096
+ # just choosing a safe number below this to allow for padding and
+ # encryption
+ body_maxlen = 4000
+
+ # A title can not be used for SMS Messages. Setting this to zero will
+ # cause any title (if defined) to get placed into the message body.
+ title_maxlen = 0
+
+ # Our default is to no not use persistent storage beyond in-memory
+ # reference; this allows us to auto-generate our config if needed
+ storage_mode = PersistentStoreMode.AUTO
+
+ # 43200 = 12 hours
+ vapid_jwt_expiration_sec = 43200
+
+ # Subscription file
+ vapid_subscription_file = 'subscriptions.json'
+
+ # Allows the user to specify the NotifyImageSize object
+ image_size = NotifyImageSize.XY_72
+
+ # Define object templates
+ templates = (
+ '{schema}://{subscriber}',
+ '{schema}://{subscriber}/{targets}',
+ )
+
+ # Define our template tokens
+ template_tokens = dict(NotifyBase.template_tokens, **{
+ 'subscriber': {
+ 'name': _('API Key'),
+ 'type': 'string',
+ 'private': True,
+ 'required': True,
+ },
+ 'targets': {
+ 'name': _('Targets'),
+ 'type': 'list:string',
+ },
+ })
+
+ # Define our template args
+ template_args = dict(NotifyBase.template_tokens, **{
+ 'mode': {
+ 'name': _('Mode'),
+ 'type': 'choice:string',
+ 'values': VAPID_PUSH_MODES,
+ 'default': VAPID_PUSH_MODES[0],
+ 'map_to': 'mode',
+ },
+ # Default Time To Live (defined in seconds)
+ # 0 (Zero) - message will be delivered only if the device is reacheable
+ 'ttl': {
+ 'name': _('ttl'),
+ 'type': 'int',
+ 'default': 0,
+ 'min': 0,
+ 'max': 60,
+ },
+ 'to': {
+ 'alias_of': 'targets',
+ },
+ 'from': {
+ 'alias_of': 'subscriber',
+ },
+ 'keyfile': {
+ # A Private Keyfile is required to sign header
+ 'name': _('PEM Private KeyFile'),
+ 'type': 'string',
+ 'private': True,
+ },
+ 'subfile': {
+ # A Subscripion File is required to sign header
+ 'name': _('Subscripion File'),
+ 'type': 'string',
+ 'private': True,
+ },
+ 'image': {
+ 'name': _('Include Image'),
+ 'type': 'bool',
+ 'default': True,
+ 'map_to': 'include_image',
+ },
+ })
+
+ def __init__(self, subscriber, mode=None, targets=None, keyfile=None,
+ subfile=None, include_image=None, ttl=None, **kwargs):
+ """
+ Initialize Vapid Messaging
+
+ """
+ super().__init__(**kwargs)
+
+ # Path to our Private Key file
+ self.keyfile = None
+
+ # Path to our subscription.json file
+ self.subfile = None
+
+ #
+ # Our Targets
+ #
+ self.targets = []
+ self._invalid_targets = []
+
+ # default subscriptions
+ self.subscriptions = {}
+ self.subscriptions_loaded = False
+ self.private_key_loaded = False
+
+ # Set our Time to Live Flag
+ self.ttl = self.template_args['ttl']['default']
+ if ttl is not None:
+ try:
+ self.ttl = int(ttl)
+
+ except (ValueError, TypeError):
+ # Do nothing
+ pass
+
+ if self.ttl < self.template_args['ttl']['min'] or \
+ self.ttl > self.template_args['ttl']['max']:
+ msg = 'The Vapid TTL specified ({}) is out of range.'\
+ .format(self.ttl)
+ self.logger.warning(msg)
+ raise TypeError(msg)
+
+ # Place a thumbnail image inline with the message body
+ self.include_image = \
+ self.template_args['image']['default'] \
+ if include_image is None else include_image
+
+ result = is_email(subscriber)
+ if not result:
+ msg = 'An invalid Vapid Subscriber' \
+ '({}) was specified.'.format(subscriber)
+ self.logger.warning(msg)
+ raise TypeError(msg)
+ self.subscriber = result['full_email']
+
+ # Store our Mode/service
+ try:
+ self.mode = \
+ NotifyVapid.template_args['mode']['default'] \
+ if mode is None else mode.lower()
+
+ if self.mode not in VAPID_PUSH_MODES:
+ # allow the outer except to handle this common response
+ raise
+ except:
+ # Invalid region specified
+ msg = 'The Vapid mode specified ({}) is invalid.' \
+ .format(mode)
+ self.logger.warning(msg)
+ raise TypeError(msg)
+
+ # Our Private keyfile
+ self.keyfile = keyfile
+
+ # Our Subscription file
+ self.subfile = subfile
+
+ # Prepare our PEM Object
+ self.pem = _pem.ApprisePEMController(self.store.path, asset=self.asset)
+
+ # Create our subscription object
+ self.subscriptions = subscription.WebPushSubscriptionManager(
+ asset=self.asset)
+
+ if self.subfile is None and \
+ self.store.mode != PersistentStoreMode.MEMORY and \
+ self.asset.pem_autogen:
+
+ self.subfile = os.path.join(
+ self.store.path, self.vapid_subscription_file)
+ if not os.path.exists(self.subfile) and \
+ self.subscriptions.write(self.subfile):
+ self.logger.info(
+ 'Vapid auto-generated %s/%s',
+ os.path.basename(self.store.path),
+ self.vapid_subscription_file)
+
+ # Acquire our targets for parsing
+ self.targets = parse_list(targets)
+ if not self.targets:
+ # Add ourselves
+ self.targets.append(self.subscriber)
+
+ return
+
+ def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
+ """
+ Perform Vapid Notification
+ """
+ if not self.private_key_loaded and ((
+ self.keyfile and not self.pem.private_key(
+ autogen=False, autodetect=False)
+ and not self.pem.load_private_key(self.keyfile))
+ or (not self.keyfile and not self.pem)):
+ self.logger.warning(
+ 'Provided Vapid/WebPush (PEM) Private Key file could '
+ 'not be loaded.')
+ self.private_key_loaded = True
+ return False
+ else:
+ self.private_key_loaded = True
+
+ if not self.targets:
+ # There is no one to notify; we're done
+ self.logger.warning('There are no Vapid targets to notify')
+ return False
+
+ if not self.subscriptions_loaded and self.subfile:
+ # Toggle our loaded flag to prevent trying again later
+ self.subscriptions_loaded = True
+ if not self.subscriptions.load(
+ self.subfile, byte_limit=self.max_vapid_subfile_size):
+ self.logger.warning(
+ 'Provided Vapid/WebPush subscriptions file could not be '
+ 'loaded.')
+ return False
+
+ if not self.subscriptions:
+ self.logger.warning('Vapid could not load subscriptions')
+ return False
+
+ if not self.pem.private_key(autogen=False, autodetect=False):
+ self.logger.warning(
+ 'No Vapid/WebPush (PEM) Private Key file could be loaded.')
+ return False
+
+ # Prepare our notify URL (based on our mode)
+ notify_url = VAPID_API_LOOKUP[self.mode]
+ headers = {
+ 'User-Agent': self.app_id,
+ "TTL": str(self.ttl),
+ "Content-Encoding": "aes128gcm",
+ "Content-Type": "application/octet-stream",
+ "Authorization": f"vapid t={self.jwt_token}, k={self.public_key}",
+ }
+
+ has_error = False
+
+ # Create a copy of the targets list
+ targets = list(self.targets)
+ while len(targets):
+ target = targets.pop(0)
+ if target not in self.subscriptions:
+ self.logger.warning(
+ 'Dropped Vapid user '
+ '(%s) specified - not found in subscriptions.json.' %
+ target,
+ )
+ # Save ourselves from doing this again
+ self._invalid_targets.append(target)
+ self.targets.remove(target)
+ has_error = True
+ continue
+
+ # Encrypt our payload
+ encrypted_payload = self.pem.encrypt_webpush(
+ body,
+ public_key=self.subscriptions[target].public_key,
+ auth_secret=self.subscriptions[target].auth_secret)
+
+ self.logger.debug(
+ 'Vapid %s POST URL: %s (cert_verify=%r)',
+ self.mode, notify_url, self.verify_certificate,
+ )
+ self.logger.debug(
+ 'Vapid %s Encrypted Payload: %d byte(s)', self.mode, len(body))
+
+ # Always call throttle before any remote server i/o is made
+ self.throttle()
+ try:
+ r = requests.post(
+ notify_url,
+ data=encrypted_payload,
+ headers=headers,
+ verify=self.verify_certificate,
+ timeout=self.request_timeout,
+ )
+ if r.status_code not in (
+ requests.codes.ok, requests.codes.no_content):
+ # We had a problem
+ status_str = \
+ NotifyBase.http_response_code_lookup(r.status_code)
+
+ self.logger.warning(
+ 'Failed to send {} Vapid notification: '
+ '{}{}error={}.'.format(
+ self.mode,
+ status_str,
+ ', ' if status_str else '',
+ r.status_code))
+
+ self.logger.debug(
+ 'Response Details:\r\n%s', r.content)
+
+ has_error = True
+
+ else:
+ self.logger.info('Sent %s Vapid notification.', self.mode)
+
+ except requests.RequestException as e:
+ self.logger.warning(
+ 'A Connection error occurred sending Vapid '
+ 'notification.'
+ )
+ self.logger.debug('Socket Exception: %s', str(e))
+
+ has_error = True
+
+ return not has_error
+
+ @property
+ def url_identifier(self):
+ """
+ Returns all of the identifiers that make this URL unique from
+ another simliar one. Targets or end points should never be identified
+ here.
+ """
+ return (self.secure_protocol, self.mode, self.subscriber)
+
+ def url(self, privacy=False, *args, **kwargs):
+ """
+ Returns the URL built dynamically based on specified arguments.
+ """
+
+ # Define any URL parameters
+ params = {
+ 'mode': self.mode,
+ 'ttl': str(self.ttl),
+ }
+
+ if self.keyfile:
+ # Include our keyfile if specified
+ params['keyfile'] = self.keyfile
+
+ if self.subfile:
+ # Include our subfile if specified
+ params['subfile'] = self.subfile
+
+ # Extend our parameters
+ params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
+
+ targets = self.targets if not (
+ self.targets == 1 and
+ self.targets[0].lower() == self.subscriber.lower()) else []
+ return '{schema}://{subscriber}/{targets}?{params}'.format(
+ schema=self.secure_protocol,
+ subscriber=NotifyVapid.quote(self.subscriber, safe='@'),
+ targets='/'.join(chain(
+ [str(t) for t in targets],
+ [NotifyVapid.quote(x, safe='@')
+ for x in self._invalid_targets])),
+ params=NotifyVapid.urlencode(params),
+ )
+
+ def __len__(self):
+ """
+ Returns the number of targets associated with this notification
+ """
+ targets = len(self.targets)
+ return targets if targets else 1
+
+ @staticmethod
+ def parse_url(url):
+ """
+ Parses the URL and returns enough arguments that can allow
+ us to re-instantiate this object.
+
+ """
+ results = NotifyBase.parse_url(url, verify_host=False)
+ if not results:
+ # We're done early as we couldn't load the results
+ return results
+
+ # Prepare our targets
+ results['targets'] = []
+ if 'from' in results['qsd'] and len(results['qsd']['from']):
+ results['subscriber'] = \
+ NotifyVapid.unquote(results['qsd']['from'])
+
+ if results['user'] and results['host']:
+ # whatever is left on the URL goes
+ results['targets'].append('{}@{}'.format(
+ NotifyVapid.unquote(results['user']),
+ NotifyVapid.unquote(results['host']),
+ ))
+
+ elif results['host']:
+ results['targets'].append(
+ NotifyVapid.unquote(results['host']))
+
+ else:
+ # Acquire our subscriber information
+ results['subscriber'] = '{}@{}'.format(
+ NotifyVapid.unquote(results['user']),
+ NotifyVapid.unquote(results['host']),
+ )
+
+ results['targets'].extend(
+ NotifyVapid.split_path(results['fullpath']))
+
+ # Get our mode
+ results['mode'] = results['qsd'].get('mode')
+
+ # Get Image Flag
+ results['include_image'] = \
+ parse_bool(results['qsd'].get(
+ 'image', NotifyVapid.template_args['image']['default']))
+
+ # The 'to' makes it easier to use yaml configuration
+ if 'to' in results['qsd'] and len(results['qsd']['to']):
+ results['targets'] += \
+ NotifyVapid.parse_list(results['qsd']['to'])
+
+ # Our Private Keyfile (PEM)
+ if 'keyfile' in results['qsd'] and results['qsd']['keyfile']:
+ results['keyfile'] = \
+ NotifyVapid.unquote(results['qsd']['keyfile'])
+
+ # Our Subscription File (JSON)
+ if 'subfile' in results['qsd'] and results['qsd']['subfile']:
+ results['subfile'] = \
+ NotifyVapid.unquote(results['qsd']['subfile'])
+
+ # Support the 'ttl' variable
+ if 'ttl' in results['qsd'] and len(results['qsd']['ttl']):
+ results['ttl'] = \
+ NotifyVapid.unquote(results['qsd']['ttl'])
+
+ return results
+
+ @property
+ def jwt_token(self):
+ """
+ Returns our VAPID Token based on class details
+ """
+ # JWT header
+ header = {
+ "alg": "ES256",
+ "typ": "JWT"
+ }
+
+ # JWT payload
+ payload = {
+ "aud": VAPID_API_LOOKUP[self.mode],
+ "exp": int(time.time()) + self.vapid_jwt_expiration_sec,
+ "sub": f"mailto:{self.subscriber}"
+ }
+
+ # Base64 URL encode header and payload
+ header_b64 = base64_urlencode(
+ dumps(header, separators=(",", ":")).encode('utf-8'))
+ payload_b64 = base64_urlencode(
+ dumps(payload, separators=(",", ":")).encode('utf-8'))
+ signing_input = f"{header_b64}.{payload_b64}".encode('utf-8')
+ signature_b64 = base64_urlencode(self.pem.sign(signing_input))
+
+ # Return final token
+ return f"{header_b64}.{payload_b64}.{signature_b64}"
+
+ @property
+ def public_key(self):
+ """
+ Returns our public key representation
+ """
+ return self.pem.x962_str
diff --git a/apprise/plugins/vapid/subscription.py b/apprise/plugins/vapid/subscription.py
new file mode 100644
index 00000000..27a79eda
--- /dev/null
+++ b/apprise/plugins/vapid/subscription.py
@@ -0,0 +1,431 @@
+# -*- coding: utf-8 -*-
+# BSD 2-Clause License
+#
+# Apprise - Push Notification Library.
+# Copyright (c) 2025, Chris Caron
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+import json
+from typing import Optional, Union
+from ...asset import AppriseAsset
+from ...utils.base64 import base64_urldecode
+from ...exception import AppriseInvalidData
+from ...apprise_attachment import AppriseAttachment
+
+try:
+ from cryptography.hazmat.primitives.asymmetric import ec
+
+ # Cryptography Support enabled
+ CRYPTOGRAPHY_SUPPORT = True
+
+except ImportError:
+ # Cryptography Support disabled
+ CRYPTOGRAPHY_SUPPORT = False
+
+
+class WebPushSubscription:
+ """
+ WebPush Subscription
+ """
+ # Format:
+ # {
+ # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
+ # "keys": {
+ # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
+ # "auth": "k9Xzm43nBGo=",
+ # }
+ # }
+ def __init__(self, content: Union[str, dict, None] = None) -> None:
+ """
+ Prepares a webpush object provided with content
+ Content can be a dictionary, or JSON String
+ """
+
+ # Our variables
+ self.__endpoint = None
+ self.__p256dh = None
+ self.__auth = None
+ self.__auth_secret = None
+ self.__public_key = None
+
+ if content is not None:
+ if not self.load(content):
+ raise AppriseInvalidData('Could not load subscription')
+
+ def load(self, content: Union[str, dict, None] = None) -> bool:
+ """
+ Performs the loading/validation of the object
+ """
+
+ # Reset our variables
+ self.__endpoint = None
+ self.__p256dh = None
+ self.__auth = None
+ self.__auth_secret = None
+ self.__public_key = None
+
+ if not CRYPTOGRAPHY_SUPPORT:
+ return False
+
+ if isinstance(content, str):
+ try:
+ content = json.loads(content)
+
+ except (json.decoder.JSONDecodeError, TypeError, OSError):
+ # Bad data
+ return False
+
+ if not isinstance(content, dict):
+ # We could not load he result set
+ return False
+
+ # Retreive our contents for validation
+ endpoint = content.get('endpoint')
+ if not isinstance(endpoint, str):
+ return False
+
+ try:
+ p256dh = base64_urldecode(content['keys']['p256dh'])
+ if not p256dh:
+ return False
+
+ auth_secret = base64_urldecode(content['keys']['auth'])
+ if not auth_secret:
+ return False
+
+ except KeyError:
+ return False
+
+ try:
+ # Store our data
+ self.__public_key = ec.EllipticCurvePublicKey.from_encoded_point(
+ ec.SECP256R1(), p256dh,
+ )
+
+ except ValueError:
+ # Invalid p256dh key (Can't load Public Key)
+ return False
+
+ self.__endpoint = endpoint
+ self.__p256dh = content['keys']['p256dh']
+ self.__auth = content['keys']['auth']
+ self.__auth_secret = auth_secret
+
+ return True
+
+ def write(self, path: str, indent: int = 2) -> bool:
+ """
+ Writes content to disk based on path specified. Content is a JSON
+ file, so ideally you may wish to have `.json' as it's extension for
+ clarity
+ """
+ if not self.__public_key:
+ return False
+
+ try:
+ with open(path, 'w', encoding='utf-8') as f:
+ json.dump(self.dict, f, indent=indent)
+
+ except (TypeError, OSError):
+ # Could not write content
+ return False
+
+ return True
+
+ @property
+ def auth(self) -> Optional[str]:
+ return self.__auth if self.__public_key else None
+
+ @property
+ def endpoint(self) -> Optional[str]:
+ return self.__endpoint if self.__public_key else None
+
+ @property
+ def p256dh(self) -> Optional[str]:
+ return self.__p256dh if self.__public_key else None
+
+ @property
+ def auth_secret(self) -> Optional[bytes]:
+ return self.__auth_secret if self.__public_key else None
+
+ @property
+ def public_key(self) -> Optional['ec.EllipticCurvePublicKey']:
+ return self.__public_key
+
+ @property
+ def dict(self) -> dict:
+ return {
+ "endpoint": self.__endpoint,
+ "keys": {
+ "p256dh": self.__p256dh,
+ "auth": self.__auth,
+ },
+ } if self.__public_key else {
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123...',
+ "keys": {
+ "p256dh": '',
+ "auth": '',
+ },
+ }
+
+ def json(self, indent: int = 2) -> str:
+ """
+ Returns JSON representation of the object
+ """
+ return json.dumps(self.dict, indent=indent)
+
+ def __bool__(self) -> bool:
+ """
+ handle 'if' statement
+ """
+ return True if self.__public_key else False
+
+ def __str__(self) -> str:
+ """
+ Returns our JSON entry as a string
+ """
+ # Return the first 16 characters of the detected endpoint subscription
+ # id
+ return '' if not self.__endpoint \
+ else self.__endpoint.split('/')[-1][:16]
+
+
+class WebPushSubscriptionManager:
+ """
+ WebPush Subscription Manager
+ """
+ # Format:
+ # {
+ # "name1": {
+ # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
+ # "keys": {
+ # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
+ # "auth": "k9Xzm43nBGo=",
+ # }
+ # },
+ # "name2": {
+ # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
+ # "keys": {
+ # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
+ # "auth": "k9Xzm43nBGo=",
+ # }
+ # },
+
+ # Defines the number of failures we can accept before we abort and assume
+ # the file is bad
+ max_load_failure_count = 3
+
+ def __init__(self, asset: Optional['AppriseAsset'] = None) -> None:
+ """
+ Webpush Subscription Manager
+ """
+
+ # Our subscriptions
+ self.__subscriptions = {}
+
+ # Prepare our Asset Object
+ self.asset = \
+ asset if isinstance(asset, AppriseAsset) else AppriseAsset()
+
+ def __getitem__(self, key: str) -> WebPushSubscription:
+ """
+ Returns our indexed value if it exists
+ """
+ return self.__subscriptions[key.lower()]
+
+ def __setitem__(self, name: str,
+ subscription: Union[WebPushSubscription, str, dict]
+ ) -> None:
+ """
+ Set's our object if possible
+ """
+
+ if not self.add(subscription, name=name.lower()):
+ raise AppriseInvalidData('Invalid subscription provided')
+
+ def add(self, subscription: Union[WebPushSubscription, str, dict],
+ name: Optional[str] = None) -> bool:
+ """
+ Add a subscription into our manager
+ """
+
+ if not isinstance(subscription, WebPushSubscription):
+ try:
+ # Support loading our object
+ subscription = WebPushSubscription(subscription)
+
+ except AppriseInvalidData:
+ return False
+
+ if name is None:
+ name = str(subscription)
+
+ self.__subscriptions[name.lower()] = subscription
+ return True
+
+ def __bool__(self) -> bool:
+ """
+ True is returned if at least one subscription has been loaded.
+ """
+ return True if self.__subscriptions else False
+
+ def __len__(self) -> int:
+ """
+ Returns the number of servers loaded; this includes those found within
+ loaded configuration. This funtion nnever actually counts the
+ Config entry themselves (if they exist), only what they contain.
+ """
+ return len(self.__subscriptions)
+
+ def __iadd__(self, subscription: Union[WebPushSubscription, str, dict]
+ ) -> 'WebPushSubscriptionManager':
+
+ if not self.add(subscription):
+ raise AppriseInvalidData('Invalid subscription provided')
+
+ return self
+
+ def __contains__(self, key: str) -> bool:
+ """
+ Checks if the key exists
+ """
+ return key.lower() in self.__subscriptions
+
+ def clear(self) -> None:
+ """
+ Empties our server list
+
+ """
+ self.__subscriptions.clear()
+
+ @property
+ def dict(self) -> dict:
+ """
+ Returns a dictionary of all entries
+ """
+ return {k: v.dict for k, v in self.__subscriptions.items()} \
+ if self.__subscriptions else {}
+
+ def load(self, path: str, byte_limit=0) -> bool:
+ """
+ Writes content to disk based on path specified. Content is a JSON
+ file, so ideally you may wish to have `.json' as it's extension for
+ clarity
+
+ if byte_limit is zero, then we do not limit our file size, otherwise
+ set this to the bytes you want to restrict yourself by
+ """
+
+ # Reset our object
+ self.clear()
+
+ # Create our attachment object
+ attach = AppriseAttachment(asset=self.asset)
+
+ # Add our path
+ attach.add(path)
+
+ if byte_limit > 0:
+ # Enforce maximum file size
+ attach[0].max_file_size = byte_limit
+
+ if not attach.sync():
+ return False
+
+ try:
+ # Otherwise open our path
+ with open(attach[0].path, 'r', encoding='utf-8') as f:
+ content = json.load(f)
+
+ except (json.decoder.JSONDecodeError, TypeError, OSError):
+ # Could not read
+ return False
+
+ if not isinstance(content, dict):
+ # Not a list of dictionaries
+ return False
+
+ # Verify if we're dealing with a single element:
+ # {
+ # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
+ # "keys": {
+ # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
+ # "auth": "k9Xzm43nBGo=",
+ # }
+ # }
+ #
+ # or if we're dealing with a multiple set
+ #
+ # {
+ # "name1": {
+ # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
+ # "keys": {
+ # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
+ # "auth": "k9Xzm43nBGo=",
+ # }
+ # },
+ # "name2": {
+ # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
+ # "keys": {
+ # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
+ # "auth": "k9Xzm43nBGo=",
+ # }
+ # },
+
+ error_count = 0
+ if 'endpoint' in content and 'keys' in content:
+ if not self.add(content):
+ return False
+
+ else:
+ for name, subscription in content.items():
+ if not self.add(subscription, name=name.lower()):
+ error_count += 1
+ if error_count > self.max_load_failure_count:
+ self.clear()
+ return False
+
+ return True
+
+ def write(self, path: str, indent: int = 2) -> bool:
+ """
+ Writes content to disk based on path specified. Content is a JSON
+ file, so ideally you may wish to have `.json' as it's extension for
+ clarity
+ """
+ try:
+ with open(path, 'w', encoding='utf-8') as f:
+ json.dump(self.dict, f, indent=indent)
+
+ except (TypeError, OSError):
+ # Could not write content
+ return False
+
+ return True
+
+ def json(self, indent: int = 2) -> str:
+ """
+ Returns JSON representation of the object
+ """
+ return json.dumps(self.dict, indent=indent)
diff --git a/apprise/utils/base64.py b/apprise/utils/base64.py
index e5a8a568..33bb4934 100644
--- a/apprise/utils/base64.py
+++ b/apprise/utils/base64.py
@@ -32,6 +32,33 @@ import typing
import base64
+def base64_urlencode(data: bytes) -> str:
+ """
+ URL Safe Base64 Encoding
+ """
+ try:
+ return base64.urlsafe_b64encode(data).rstrip(b'=').decode('utf-8')
+
+ except TypeError:
+ # data is not supported; avoid raising exception
+ return None
+
+
+def base64_urldecode(data: str) -> bytes:
+ """
+ URL Safe Base64 Encoding
+ """
+
+ try:
+ # Normalize base64url string (remove padding, add it back)
+ padding = '=' * (-len(data) % 4)
+ return base64.urlsafe_b64decode(data + padding)
+
+ except TypeError:
+ # data is not supported; avoid raising exception
+ return None
+
+
def decode_b64_dict(di: dict) -> dict:
"""
decodes base64 dictionary previously encoded
diff --git a/apprise/utils/pem.py b/apprise/utils/pem.py
new file mode 100644
index 00000000..67b948a1
--- /dev/null
+++ b/apprise/utils/pem.py
@@ -0,0 +1,825 @@
+# -*- coding: utf-8 -*-
+# BSD 2-Clause License
+#
+# Apprise - Push Notification Library.
+# Copyright (c) 2025, Chris Caron
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+import os
+import json
+import base64
+import binascii
+import struct
+from typing import Union, Optional
+from ..utils.base64 import base64_urlencode, base64_urldecode
+from ..apprise_attachment import AppriseAttachment
+from ..asset import AppriseAsset
+from ..logger import logger
+from ..exception import ApprisePluginException
+
+try:
+ from cryptography.hazmat.backends import default_backend
+ from cryptography.hazmat.primitives.kdf.hkdf import HKDF
+ from cryptography.hazmat.primitives import serialization, hashes
+ from cryptography.hazmat.primitives.ciphers import (
+ Cipher, algorithms, modes)
+ from cryptography.exceptions import InvalidTag
+ from cryptography.hazmat.primitives.ciphers.aead import AESGCM
+ from cryptography.hazmat.primitives.asymmetric import ec
+ from cryptography.hazmat.primitives.serialization import (
+ Encoding,
+ NoEncryption,
+ PrivateFormat,
+ PublicFormat,
+ )
+ from cryptography.hazmat.primitives.asymmetric.utils import (
+ decode_dss_signature
+ )
+
+ # PEM Support enabled
+ PEM_SUPPORT = True
+
+except ImportError:
+ # PEM Support disabled
+ PEM_SUPPORT = False
+
+
+class ApprisePEMException(ApprisePluginException):
+ """
+ Thrown when there is an error with the PEM Controller
+ """
+ def __init__(self, message, error_code=612):
+ super().__init__(message, error_code=error_code)
+
+
+class ApprisePEMController:
+ """
+ PEM Controller Tool for the Apprise Library
+ """
+
+ # There is no reason a PEM Public Key should exceed 8K in size
+ # If it is more than this, then it is not accepted
+ max_pem_public_key_size = 8000
+
+ # There is no reason a PEM Private Key should exceed 8K in size
+ # If it is more than this, then it is not accepted
+ max_pem_private_key_size = 8000
+
+ # Maximum Vapid Message Size
+ max_webpush_record_size = 4096
+
+ def __init__(self, path: str,
+ pub_keyfile: Optional[str] = None,
+ prv_keyfile: Optional[str] = None,
+ name: Optional[str] = None,
+ asset: Optional[AppriseAsset] = None,
+ **kwargs) -> None:
+ """
+ Path should be the directory keys can be written and read from such as
+ .store.path
+
+ Optionally additionally specify a keyfile to explicitly open
+ """
+
+ # Directory we can work with
+ self.path = path
+
+ # Prepare our Key Placeholders
+ self.__private_key = None
+ self.__public_key = None
+
+ # Our name (id)
+ self.name = name.strip(' \t/-+!$@#*').lower() \
+ if isinstance(name, str) else ''
+
+ # Prepare our Asset Object
+ self.asset = \
+ asset if isinstance(asset, AppriseAsset) else AppriseAsset()
+
+ # Our temporary reference points
+ self._prv_keyfile = AppriseAttachment(asset=self.asset)
+ self._pub_keyfile = AppriseAttachment(asset=self.asset)
+
+ if prv_keyfile:
+ self.load_private_key(prv_keyfile)
+
+ elif pub_keyfile:
+ self.load_public_key(pub_keyfile)
+
+ else:
+ self._pub_keyfile = None
+
+ def load_private_key(self, path: Optional[str] = None,
+ *names: str) -> bool:
+ """
+ Load Private key and from that we can prepare our public key
+ """
+
+ if path is None:
+ # Auto-load our content
+ return True if self.private_keyfile(*names) else False
+
+ # Create ourselves an Attachment to work with; this grants us the
+ # ability to pull this key from a remote site or anything else
+ # supported by the Attachment object
+ self._prv_keyfile = AppriseAttachment(asset=self.asset)
+
+ # Add our definition to our pem_key reference
+ self._prv_keyfile.add(path)
+
+ # Enforce maximum file size
+ self._prv_keyfile[0].max_file_size = self.max_pem_private_key_size
+
+ #
+ # Reset Public key
+ #
+ self._pub_keyfile = AppriseAttachment(asset=self.asset)
+
+ #
+ # Reset our internal keys
+ #
+ self.__private_key = None
+ self.__public_key = None
+
+ if not self._prv_keyfile.sync():
+ # Early exit
+ logger.error(
+ 'Could not access PEM Private Key {}.'.format(path))
+ return False
+
+ try:
+ with open(self._prv_keyfile[0].path, "rb") as f:
+ self.__private_key = serialization.load_pem_private_key(
+ f.read(),
+ password=None, # or provide the password if encrypted
+ backend=default_backend()
+ )
+
+ except (ValueError, TypeError):
+ logger.debug(
+ 'PEM Private Key file specified is not supported (%s)',
+ type(path))
+ return False
+
+ except FileNotFoundError:
+ logger.debug('PEM Private Key file not found: %s', path)
+ return False
+
+ except OSError as e:
+ logger.warning('Error accessing PEM Private Key file %s', path)
+ logger.debug(f'I/O Exception: {e}')
+ return False
+
+ #
+ # Generate our public key
+ #
+ self.__public_key = self.__private_key.public_key()
+
+ # Load our private key
+ return True if self.__private_key else False
+
+ def load_public_key(self, path: Optional[str] = None, *names: str) -> bool:
+ """
+ Load Public key only
+
+ Note: with just a public key you can only decrypt, encryption is not
+ possible.
+ """
+
+ if path is None:
+ # Auto-load our content
+ return True if self.public_keyfile(*names) else False
+
+ # Create ourselves an Attachment to work with; this grants us the
+ # ability to pull this key from a remote site or anything else
+ # supported by the Attachment object
+ self._pub_keyfile = AppriseAttachment(asset=self.asset)
+
+ # Add our definition to our pem_key reference
+ self._pub_keyfile.add(path)
+
+ # Enforce maximum file size
+ self._pub_keyfile[0].max_file_size = self.max_pem_public_key_size
+
+ #
+ # Reset Private key
+ #
+ self._prv_keyfile = AppriseAttachment(asset=self.asset)
+
+ #
+ # Reset our internal keys
+ #
+ self.__private_key = None
+ self.__public_key = None
+
+ if not self._pub_keyfile.sync():
+ # Early exit
+ logger.error(
+ 'Could not access PEM Public Key {}.'.format(path))
+ return False
+
+ try:
+ with open(path, 'rb') as key_file:
+ self.__public_key = serialization.load_pem_public_key(
+ key_file.read(),
+ backend=default_backend()
+ )
+
+ except (ValueError, TypeError):
+ logger.debug(
+ 'PEM Public Key file specified is not supported (%s)',
+ type(path))
+ return False
+
+ except FileNotFoundError:
+ # Generate keys
+ logger.debug('PEM Public Key file not found: %s', path)
+ return False
+
+ except OSError as e:
+ logger.warning('Error accessing PEM Public Key file %s', path)
+ logger.debug(f'I/O Exception: {e}')
+ return False
+
+ # Load our private key
+ return True if self.__public_key else False
+
+ def keygen(self, name: 'Optional[str]' = None, force: bool = False):
+ """
+ Generates a set of keys based on name configured.
+ """
+
+ if not PEM_SUPPORT:
+ msg = 'PEM Support unavailable; install cryptography library'
+ logger.warning(msg)
+ raise ApprisePEMException(msg)
+
+ # Detect if a key has been loaded or not
+ has_key = True if self.private_key(autogen=False) \
+ or self.public_key(autogen=False) else False
+
+ if (has_key and not (name or force)) or not self.path:
+ logger.trace(
+ 'PEM keygen disabled, reason=%s',
+ 'keyfile-defined' if not has_key
+ else 'no-write-path')
+ return False
+
+ # Create a new private/public key pair
+ self.__private_key = ec.generate_private_key(
+ ec.SECP256R1(), default_backend())
+ self.__public_key = self.__private_key.public_key()
+
+ #
+ # Prepare our PEM formatted output files
+ #
+ private_key = self.__private_key.private_bytes(
+ Encoding.PEM,
+ PrivateFormat.PKCS8,
+ encryption_algorithm=NoEncryption(),
+ )
+
+ public_key = self.__public_key.public_bytes(
+ encoding=Encoding.PEM,
+ format=PublicFormat.SubjectPublicKeyInfo,
+ )
+
+ if not name:
+ name = self.name
+
+ file_prefix = '' if not name else f'{name}-'
+ pub_path = os.path.join(self.path, f'{file_prefix}public_key.pem')
+ prv_path = os.path.join(self.path, f'{file_prefix}private_key.pem')
+
+ if not force:
+ if os.path.isfile(pub_path):
+ logger.debug(
+ 'PEM generation skipped; Public Key already exists: %s/%s',
+ os.path.dirname(pub_path), os.path.basename(pub_path))
+ return False
+
+ if os.path.isfile(prv_path):
+ logger.debug(
+ 'PEM generation skipped; Private Key already exists: %s%s',
+ os.path.dirname(prv_path), os.path.basename(prv_path))
+ return False
+
+ try:
+ # Write our keys to disk
+ with open(pub_path, 'wb') as f:
+ f.write(public_key)
+
+ except OSError as e:
+ logger.warning('Error writing Public PEM file %s', pub_path)
+ logger.debug(f'I/O Exception: {e}')
+
+ # Cleanup
+ try:
+ os.unlink(pub_path)
+ logger.trace('Removed %s', pub_path)
+
+ except OSError:
+ pass
+
+ return False
+
+ try:
+ with open(prv_path, 'wb') as f:
+ f.write(private_key)
+
+ except OSError as e:
+ logger.warning('Error writing Private PEM file %s', prv_path)
+ logger.debug(f'I/O Exception: {e}')
+
+ try:
+ os.unlink(pub_path)
+ logger.trace('Removed %s', pub_path)
+
+ except OSError:
+ pass
+
+ try:
+ os.unlink(prv_path)
+ logger.trace('Removed %s', prv_path)
+
+ except OSError:
+ pass
+
+ return False
+
+ # Update our local file references
+ self._prv_keyfile = AppriseAttachment(asset=self.asset)
+ self._prv_keyfile.add(prv_path)
+
+ self._pub_keyfile = AppriseAttachment(asset=self.asset)
+ self._pub_keyfile.add(pub_path)
+
+ logger.info(
+ 'Wrote Public/Private PEM key pair for %s/%s',
+ os.path.dirname(pub_path),
+ os.path.basename(pub_path))
+ return True
+
+ def public_keyfile(self, *names: str) -> Optional[str]:
+ """
+ Returns the first match of a useable public key based names provided
+ """
+
+ if not PEM_SUPPORT:
+ msg = 'PEM Support unavailable; install cryptography library'
+ logger.warning(msg)
+ raise ApprisePEMException(msg)
+
+ if self._pub_keyfile:
+ # If our code reaches here, then we fetch our public key
+ pem_key = self._pub_keyfile[0]
+ if not pem_key:
+ # We could not access the attachment
+ logger.error(
+ 'Could not access PEM Public Key {}.'.format(
+ pem_key.url(privacy=True)))
+ return False
+
+ return pem_key.path
+
+ elif not self.path:
+ # No path
+ return None
+
+ fnames = [
+ 'public_key.pem',
+ 'public.pem',
+ 'pub.pem',
+ ]
+
+ if self.name:
+ # Include our name in the list
+ fnames = [self.name] + [*names]
+
+ for name in names:
+ fnames.insert(0, f'{name}-public_key.pem')
+
+ _entry = name.lower()
+ fnames.insert(0, f'{_entry}-public_key.pem')
+
+ return next(
+ (os.path.join(self.path, fname)
+ for fname in fnames
+ if os.path.isfile(os.path.join(self.path, fname))),
+ None)
+
+ def private_keyfile(self, *names: str) -> Optional[str]:
+ """
+ Returns the first match of a useable private key based names provided
+ """
+
+ if not PEM_SUPPORT:
+ msg = 'PEM Support unavailable; install cryptography library'
+ logger.warning(msg)
+ raise ApprisePEMException(msg)
+
+ if self._prv_keyfile:
+ # If our code reaches here, then we fetch our private key
+ pem_key = self._prv_keyfile[0]
+ if not pem_key:
+ # We could not access the attachment
+ logger.error(
+ 'Could not access PEM Private Key {}.'.format(
+ pem_key.url(privacy=True)))
+ return False
+
+ return pem_key.path
+
+ elif not self.path:
+ # No path
+ return None
+
+ fnames = [
+ 'private_key.pem',
+ 'private.pem',
+ 'prv.pem',
+ ]
+
+ if self.name:
+ # Include our name in the list
+ fnames = [self.name] + [*names]
+
+ for name in names:
+ fnames.insert(0, f'{name}-private_key.pem')
+
+ _entry = name.lower()
+ fnames.insert(0, f'{_entry}-private_key.pem')
+
+ return next(
+ (os.path.join(self.path, fname)
+ for fname in fnames
+ if os.path.isfile(os.path.join(self.path, fname))),
+ None)
+
+ def public_key(self, *names: str, autogen: Optional[bool] = None,
+ autodetect: bool = True
+ ) -> Optional['ec.EllipticCurvePublicKey']:
+ """
+ Opens a spcified pem public file and returns the key from it which
+ is used to decrypt the message
+ """
+ if self.__public_key or not autodetect:
+ return self.__public_key
+
+ path = self.public_keyfile(*names)
+ if not path:
+ if (autogen if autogen is not None else self.asset.pem_autogen) \
+ and self.keygen(*names):
+ path = self.public_keyfile(*names)
+ if path:
+ # We should get a hit now
+ return self.public_key(autogen=False)
+
+ logger.warning('No PEM Public Key could be loaded')
+ return None
+
+ return self.__public_key if (
+ self.load_public_key(path) or
+ # Try to see if we can load a private key (which we can generate a
+ # public from)
+ self.private_key(*names, autogen=autogen)) else None
+
+ def private_key(self, *names: str, autogen: Optional[bool] = None,
+ autodetect: bool = True
+ ) -> Optional['ec.EllipticCurvePrivateKey']:
+ """
+ Opens a spcified pem private file and returns the key from it which
+ is used to encrypt the message
+ """
+ if self.__private_key or not autodetect:
+ return self.__private_key
+
+ path = self.private_keyfile(*names)
+ if not path:
+ if (autogen if autogen is not None else self.asset.pem_autogen) \
+ and self.keygen(*names):
+ path = self.private_keyfile(*names)
+ if path:
+ # We should get a hit now
+ return self.private_key(autogen=False)
+
+ logger.warning('No PEM Private Key could be loaded')
+ return None
+
+ return self.__private_key if self.load_private_key(path) else None
+
+ def encrypt_webpush(self, message: Union[str, bytes],
+ public_key: 'ec.EllipticCurvePublicKey',
+ auth_secret: bytes) -> bytes:
+ """
+ Encrypt a WebPush message using the recipient's public key and auth
+ secret.
+
+ Accepts input message as str or bytes.
+ """
+ if isinstance(message, str):
+ message = message.encode('utf-8')
+
+ # 1. Generate ephemeral EC private/Public key
+ ephemeral_private_key = \
+ ec.generate_private_key(ec.SECP256R1(), default_backend())
+ ephemeral_public_key = ephemeral_private_key.public_key().public_bytes(
+ encoding=Encoding.X962, format=PublicFormat.UncompressedPoint)
+
+ # 2. Random salt
+ salt = os.urandom(16)
+
+ # 3. Generate shared secret via ECDH
+ shared_secret = ephemeral_private_key.exchange(ec.ECDH(), public_key)
+
+ # 4. Derive PRK using HKDF (first phase)
+ recipient_public_key_bytes = public_key.public_bytes(
+ encoding=Encoding.X962,
+ format=PublicFormat.UncompressedPoint,
+ )
+
+ # 5. Derive Encryption key
+ hkdf_secret = HKDF(
+ algorithm=hashes.SHA256(),
+ length=32,
+ salt=auth_secret,
+ info=b"WebPush: info\x00" +
+ recipient_public_key_bytes + ephemeral_public_key,
+ backend=default_backend(),
+ ).derive(shared_secret)
+
+ # 6. Derive Content Encryption Key
+ hkdf_key = HKDF(
+ algorithm=hashes.SHA256(),
+ length=16,
+ salt=salt,
+ info=b"Content-Encoding: aes128gcm\x00",
+ backend=default_backend(),
+ ).derive(hkdf_secret)
+
+ # 7. Derive Nonce
+ hkdf_nonce = HKDF(
+ algorithm=hashes.SHA256(),
+ length=12,
+ salt=salt,
+ info=b"Content-Encoding: nonce\x00",
+ backend=default_backend(),
+ ).derive(hkdf_secret)
+
+ # 8. Encrypt the message
+ aesgcm = AESGCM(hkdf_key)
+ # RFC8291 requires us to add '\0x02' byte to end of message
+ ciphertext = aesgcm.encrypt(
+ hkdf_nonce, message + b"\x02", associated_data=None)
+
+ # 9. Build WebPush header + payload
+ header = salt
+ header += struct.pack("!L", self.max_webpush_record_size)
+ header += struct.pack("!B", len(ephemeral_public_key))
+ header += ephemeral_public_key
+ header += ciphertext
+
+ return header
+
+ def encrypt(self,
+ message: Union[str, bytes],
+ public_key: 'Optional[ec.EllipticCurvePublicKey]' = None,
+ salt: Optional[bytes] = None) -> Optional[str]:
+ """
+ Encrypts a message using the recipient's public key (or self public
+ key if none provided). Message can be str or bytes.
+ """
+
+ if not PEM_SUPPORT:
+ msg = 'PEM Support unavailable; install cryptography library'
+ logger.warning(msg)
+ raise ApprisePEMException(msg)
+
+ # 1. Handle string vs bytes input
+ if isinstance(message, str):
+ message = message.encode('utf-8')
+
+ # 2. Select public key
+ if public_key is None:
+ public_key = self.public_key()
+ if public_key is None:
+ logger.debug("No public key available for encryption.")
+ return None
+
+ # 3. Generate ephemeral EC private key
+ ephemeral_private_key = ec.generate_private_key(ec.SECP256R1())
+
+ # 4. Derive shared secret
+ shared_secret = ephemeral_private_key.exchange(ec.ECDH(), public_key)
+
+ # 5. Derive symmetric AES key using HKDF
+ derived_key = HKDF(
+ algorithm=hashes.SHA256(),
+ length=32,
+ salt=salt, # Allow salt=None if not provided
+ info=b'ecies-encryption',
+ backend=default_backend()
+ ).derive(shared_secret)
+
+ # 6. Encrypt the message using AES-GCM
+ iv = os.urandom(12) # 96-bit random IV for GCM
+ encryptor = Cipher(
+ algorithms.AES(derived_key),
+ modes.GCM(iv),
+ backend=default_backend()
+ ).encryptor()
+
+ ciphertext = encryptor.update(message) + encryptor.finalize()
+ tag = encryptor.tag
+
+ # 7. Serialize ephemeral public key as X9.62 Uncompressed Point
+ ephemeral_public_key_bytes = \
+ ephemeral_private_key.public_key().public_bytes(
+ encoding=serialization.Encoding.X962,
+ format=serialization.PublicFormat.UncompressedPoint,
+ )
+
+ # 8. Combine everything cleanly
+ full_payload = {
+ "ephemeral_pubkey": base64_urlencode(ephemeral_public_key_bytes),
+ "iv": base64_urlencode(iv),
+ "tag": base64_urlencode(tag),
+ "ciphertext": base64_urlencode(ciphertext),
+ }
+
+ return base64.b64encode(
+ json.dumps(full_payload).encode('utf-8')
+ ).decode('utf-8')
+
+ def decrypt(self,
+ encrypted_payload: Union[str, bytes],
+ private_key: 'Optional[ec.EllipticCurvePrivateKey]' = None,
+ salt: Optional[bytes] = None) -> Optional[str]:
+ """
+ Decrypts a message using the provided private key or fallback to
+ self's private key.
+
+ Payload is the base64-encoded JSON from encrypt().
+ """
+
+ if not PEM_SUPPORT:
+ msg = 'PEM Support unavailable; install cryptography library'
+ logger.warning(msg)
+ raise ApprisePEMException(msg)
+
+ # 1. Parse input
+ try:
+ if isinstance(encrypted_payload, str):
+ payload_bytes = base64.b64decode(
+ encrypted_payload.encode('utf-8'))
+
+ else:
+ payload_bytes = base64.b64decode(encrypted_payload)
+
+ except binascii.Error:
+ # Bad Padding
+ logger.debug("Unparseable encrypted content provided")
+ return None
+
+ try:
+ payload = json.loads(payload_bytes.decode('utf-8'))
+
+ except UnicodeDecodeError:
+ logger.debug("Unparseable encrypted content provided")
+ return None
+
+ ephemeral_pubkey_bytes = base64_urldecode(payload["ephemeral_pubkey"])
+ iv = base64_urldecode(payload["iv"])
+ tag = base64_urldecode(payload["tag"])
+ ciphertext = base64_urldecode(payload["ciphertext"])
+
+ # 2. Select private key
+ if private_key is None:
+ private_key = self.private_key()
+ if private_key is None:
+ logger.debug("No private key available for decryption.")
+ return None
+
+ # 3. Load ephemeral public key from sender
+ ephemeral_pubkey = ec.EllipticCurvePublicKey.from_encoded_point(
+ ec.SECP256R1(),
+ ephemeral_pubkey_bytes
+ )
+
+ # 4. ECDH shared secret
+ shared_secret = private_key.exchange(ec.ECDH(), ephemeral_pubkey)
+
+ # 5. Derive symmetric AES key with HKDF
+ derived_key = HKDF(
+ algorithm=hashes.SHA256(),
+ length=32,
+ salt=salt,
+ info=b'ecies-encryption',
+ ).derive(shared_secret)
+
+ # 6. Decrypt using AES-GCM
+ decryptor = Cipher(
+ algorithms.AES(derived_key),
+ modes.GCM(iv, tag),
+ ).decryptor()
+
+ try:
+ plaintext = decryptor.update(ciphertext) + decryptor.finalize()
+
+ except InvalidTag:
+ logger.debug("Decryption failed - Authentication Mismatch")
+ # Reason for Error:
+ # - Mismatched or missing salt
+ # - Mismatched iv, tag, or ciphertext
+ # - Incorrect or corrupted ephemeral_pubkey
+ # - Wrong or incomplete key derivation
+ # - Data being altered between encryption and decryption
+ # (truncated/corrupted)
+
+ # Basically if we get here, we tried to decrypt encrypted content
+ # using the wrong key.
+ return None
+
+ # 7. Return decoded message
+ return plaintext.decode('utf-8')
+
+ def sign(self, content: bytes) -> Optional[bytes]:
+ """
+ Sign the message using ES256 (ECDSA w/ SHA256) via private key
+ """
+
+ try:
+ # Sign the message using ES256 (ECDSA w/ SHA256)
+ der_sig = self.private_key()\
+ .sign(content, ec.ECDSA(hashes.SHA256()))
+
+ except AttributeError:
+ # NoneType; could not load key
+ return None
+
+ # Convert DER to raw R||S
+ r, s = decode_dss_signature(der_sig)
+ return r.to_bytes(
+ 32, byteorder='big') + s.to_bytes(32, byteorder='big')
+
+ @property
+ def pub_keyfile(self) -> Optional[Union[str, bool]]:
+ """
+ Returns the Public Keyfile Path if set otherwise it returns None
+ This property returns False if a keyfile was provided, but was invalid
+ """
+ return None if not self._pub_keyfile \
+ else (False if not self._pub_keyfile[0]
+ else self._pub_keyfile[0].path)
+
+ @property
+ def prv_keyfile(self) -> Optional[Union[str, bool]]:
+ """
+ Returns the Private Keyfile Path if set otherwise it returns None
+ This property returns False if a keyfile was provided, but was invalid
+ """
+ return None if not self._prv_keyfile \
+ else (False if not self._prv_keyfile[0]
+ else self._prv_keyfile[0].path)
+
+ @property
+ def x962_str(self) -> str:
+ """
+ X962 serialization based on public key
+ """
+ try:
+ return base64_urlencode(
+ self.public_key().public_bytes(
+ encoding=serialization.Encoding.X962,
+ format=serialization.PublicFormat.UncompressedPoint)
+ )
+ except AttributeError:
+ # Public Key could not be generated (public_key() returned None)
+ return ''
+
+ def __bool__(self) -> bool:
+ """
+ Returns True if at least 1 key was loaded
+ """
+ return True if (self.private_key() or self.public_key()) else False
diff --git a/packaging/redhat/python-apprise.spec b/packaging/redhat/python-apprise.spec
index a396da89..1a1f078c 100644
--- a/packaging/redhat/python-apprise.spec
+++ b/packaging/redhat/python-apprise.spec
@@ -43,7 +43,7 @@ Africas Talking, Apprise API, APRS, AWS SES, AWS SNS, Bark, BlueSky, Burst SMS,
BulkSMS, BulkVS, Chanify, Clickatell, ClickSend, DAPNET, DingTalk, Discord, E-Mail, Emby,
FCM, Feishu, Flock, Free Mobile, Google Chat, Gotify, Growl, Guilded, Home
Assistant, httpSMS, IFTTT, Join, Kavenegar, KODI, Kumulos, LaMetric, Line,
-LunaSea, MacOSX, Mailgun, Mastodon, Mattermost,Matrix, MessageBird, Microsoft
+LunaSea, MacOSX, Mailgun, Mastodon, Mattermost, Matrix, MessageBird, Microsoft
Windows, Microsoft Teams, Misskey, MQTT, MSG91, MyAndroid, Nexmo, Nextcloud,
NextcloudTalk, Notica, Notifiarr, Notifico, ntfy, Office365, OneSignal,
Opsgenie, PagerDuty, PagerTree, ParsePlatform, Plivo, PopcornNotify, Prowl,
@@ -51,8 +51,8 @@ Pushalot, PushBullet, Pushjet, PushMe, Pushover, PushSafer, Pushy, PushDeer,
Revolt, Reddit, Resend, Rocket.Chat, RSyslog, SendGrid, ServerChan, Seven, SFR,
Signal, SimplePush, Sinch, Slack, SMSEagle, SMS Manager, SMTP2Go, SparkPost,
Splunk, Super Toasty, Streamlabs, Stride, Synology Chat, Syslog, Techulus Push,
-Telegram, Threema Gateway, Twilio, Twitter, Twist, VictorOps, Voipms, Vonage,
-WeCom Bot, WhatsApp, Webex Teams, Workflows, WxPusher, XBMC}
+Telegram, Threema Gateway, Twilio, Twitter, Twist, Vapid, VictorOps, Voipms,
+Vonage, WebPush, WeCom Bot, WhatsApp, Webex Teams, Workflows, WxPusher, XBMC}
Name: python-%{pypi_name}
Version: 1.9.3
diff --git a/test/test_apprise_utils.py b/test/test_apprise_utils.py
index 29649d18..28491184 100644
--- a/test/test_apprise_utils.py
+++ b/test/test_apprise_utils.py
@@ -2752,6 +2752,30 @@ def test_cwe312_url():
'#random') == 'slack://test@B...4/J...M/X...3/'
+def test_base64_encode_decode():
+ """
+ Utils:Base64:URLEncode & Decode
+
+ """
+ assert utils.base64.base64_urlencode(None) is None
+ assert utils.base64.base64_urlencode(42) is None
+ assert utils.base64.base64_urlencode(object) is None
+ assert utils.base64.base64_urlencode({}) is None
+ assert utils.base64.base64_urlencode("") is None
+ assert utils.base64.base64_urlencode("abc") is None
+ assert utils.base64.base64_urlencode(b"") == ''
+ assert utils.base64.base64_urlencode(b"abc") == 'YWJj'
+
+ assert utils.base64.base64_urldecode(None) is None
+ assert utils.base64.base64_urldecode(42) is None
+ assert utils.base64.base64_urldecode(object) is None
+ assert utils.base64.base64_urldecode({}) is None
+
+ assert utils.base64.base64_urldecode("abc") == b'i\xb7'
+ assert utils.base64.base64_urldecode("") == b''
+ assert utils.base64.base64_urldecode('YWJj') == b'abc'
+
+
def test_dict_base64_codec(tmpdir):
"""
Test encoding/decoding of base64 content
diff --git a/test/test_attach_file.py b/test/test_attach_file.py
index d1205671..370969d2 100644
--- a/test/test_attach_file.py
+++ b/test/test_attach_file.py
@@ -248,6 +248,12 @@ def test_attach_file():
# Test hosted configuration and that we can't add a valid file
aa = AppriseAttachment(location=ContentLocation.HOSTED)
+ # No entries defined yet
+ assert bool(aa) is False
+ assert aa.sync() is False
+
+ # Entry count does not impact sync if told to act that way
+ assert aa.sync(abort_if_empty=False) is True
assert aa.add(path) is False
response = AppriseAttachment.instantiate(path)
diff --git a/test/test_plugin_vapid.py b/test/test_plugin_vapid.py
new file mode 100644
index 00000000..50dd1c20
--- /dev/null
+++ b/test/test_plugin_vapid.py
@@ -0,0 +1,697 @@
+# -*- coding: utf-8 -*-
+# BSD 2-Clause License
+#
+# Apprise - Push Notification Library.
+# Copyright (c) 2025, Chris Caron
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+import os
+import sys
+import json
+import requests
+import pytest
+from unittest import mock
+
+from apprise.plugins.vapid.subscription import (
+ WebPushSubscription, WebPushSubscriptionManager)
+from apprise.plugins.vapid import NotifyVapid
+from apprise import exception, asset, url
+from apprise.common import PersistentStoreMode
+from apprise.utils.pem import ApprisePEMController
+from helpers import AppriseURLTester
+
+# Disable logging for a cleaner testing output
+import logging
+logging.disable(logging.CRITICAL)
+
+# Attachment Directory
+TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
+
+# a test UUID we can use
+SUBSCRIBER = 'user@example.com'
+
+PLUGIN_ID = 'vapid'
+
+# Our Testing URLs
+apprise_url_tests = (
+ ('vapid://', {
+ 'instance': TypeError,
+ }),
+ ('vapid://:@/', {
+ 'instance': TypeError,
+ }),
+ ('vapid://invalid-subscriber', {
+ # An invalid Subscriber
+ 'instance': TypeError,
+ }),
+ ('vapid://user@example.com', {
+ # bare bone requirements met, but we don't have our subscription file
+ # or our private key (pem)
+ 'instance': NotifyVapid,
+ # We'll fail to respond because we would not have found any
+ # configuration to load
+ 'notify_response': False,
+ }),
+ ('vapid://user@example.com?keyfile=invalid&subfile=invalid', {
+ # Test passing keyfile and subfile on our path (even if invalid)
+ 'instance': NotifyVapid,
+ # We'll fail to respond because we would not have found any
+ # configuration to load
+ 'notify_response': False,
+ }),
+ ('vapid://user@example.com/newuser@example.com', {
+ # we don't have our subscription file or private key
+ 'instance': NotifyVapid,
+ 'notify_response': False,
+ }),
+ ('vapid://user@example.ca/newuser@example.ca', {
+ 'instance': NotifyVapid,
+ # force a failure
+ 'response': False,
+ 'requests_response_code': requests.codes.internal_server_error,
+ }),
+ ('vapid://user@example.uk/newuser@example.uk', {
+ 'instance': NotifyVapid,
+ # throw a bizzare code forcing us to fail to look it up
+ 'response': False,
+ 'requests_response_code': 999,
+ }),
+ ('vapid://user@example.au/newuser@example.au', {
+ 'instance': NotifyVapid,
+ # Throws a series of connection and transfer exceptions when this flag
+ # is set and tests that we gracfully handle them
+ 'test_requests_exceptions': True,
+ }),
+)
+
+
+@pytest.fixture
+def patch_persistent_store_namespace(tmpdir):
+ """
+ Force an easy to test environment
+ """
+ with mock.patch.object(url.URLBase, 'url_id', return_value=PLUGIN_ID), \
+ mock.patch.object(
+ asset.AppriseAsset, 'storage_mode',
+ PersistentStoreMode.AUTO), \
+ mock.patch.object(
+ asset.AppriseAsset, 'storage_path', str(tmpdir)):
+
+ tmp_dir = tmpdir.mkdir(PLUGIN_ID)
+ # Return the directory name
+ yield str(tmp_dir)
+
+
+@pytest.fixture
+def subscription_reference():
+ return {
+ "user@example.com": {
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/default',
+ "keys": {
+ "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
+ '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
+ "auth": 'k9Xzm43nBGo=',
+ },
+ },
+ "user1": {
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
+ "keys": {
+ "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
+ '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
+ "auth": 'k9Xzm43nBGo=',
+ },
+ },
+ "user2": {
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/def456',
+ "keys": {
+ "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
+ '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
+ "auth": 'k9Xzm43nBGo=',
+ },
+ },
+ }
+
+
+@pytest.mark.skipif(
+ 'cryptography' not in sys.modules, reason="Requires cryptography")
+def test_plugin_vapid_urls():
+ """
+ NotifyVapid() Apprise URLs - No Config
+
+ """
+
+ # Run our general tests
+ AppriseURLTester(tests=apprise_url_tests).run_all()
+
+
+@pytest.mark.skipif(
+ 'cryptography' not in sys.modules, reason="Requires cryptography")
+def test_plugin_vapid_urls_with_required_assets(
+ patch_persistent_store_namespace, subscription_reference):
+ """
+ NotifyVapid() Apprise URLs With Config
+ """
+
+ # Determine our store
+ pc = ApprisePEMController(path=patch_persistent_store_namespace)
+ assert pc.keygen() is True
+
+ # Write our subscriptions file to disk
+ subscription_file = os.path.join(
+ patch_persistent_store_namespace,
+ NotifyVapid.vapid_subscription_file)
+
+ with open(subscription_file, 'w') as f:
+ f.write(json.dumps(subscription_reference))
+
+ tests = (
+ ('vapid://user@example.com', {
+ # user@example.com loaded (also used as subscriber id)
+ 'instance': NotifyVapid,
+ }),
+ ('vapid://user@example.com/newuser@example.com', {
+ # no newuser@example.com key entry
+ 'instance': NotifyVapid,
+ 'notify_response': False,
+ }),
+ ('vapid://user@example.com/user1?to=user2', {
+ # We'll succesfully notify 2 users
+ 'instance': NotifyVapid,
+ }),
+ ('vapid://user1?to=user2&from=user@example.com', {
+ # We'll succesfully notify 2 users
+ 'instance': NotifyVapid,
+ }),
+ ('vapid://?to=user2&from=user@example.com', {
+ # No host provided
+ 'instance': NotifyVapid,
+ }),
+ ('vapid://user@example.com?to=user2&from=user@example.com', {
+ # We'll succesfully notify 2 users
+ 'instance': NotifyVapid,
+ }),
+ ('vapid://user@example.com/user1?to=user2&ttl=15', {
+ # test ttl
+ 'instance': NotifyVapid,
+ }),
+ ('vapid://user@example.com/user1?to=user2&ttl=', {
+ # test ttl
+ 'instance': NotifyVapid,
+ }),
+ ('vapid://user@example.com/user1?to=user2&ttl=invalid', {
+ # test ttl
+ 'instance': NotifyVapid,
+ }),
+ ('vapid://user@example.com/user1?to=user2&ttl=-4000', {
+ # bad ttl
+ 'instance': TypeError,
+ }),
+ ('vapid://user@example.com/user1?to=user2&mode=edge', {
+ # test mode
+ 'instance': NotifyVapid,
+ }),
+ ('vapid://user@example.com/user1?to=user2&mode=', {
+ # test mode
+ 'instance': TypeError,
+ }),
+ ('vapid://user@example.com/user1?to=user2&mode=invalid', {
+ # test mode more
+ 'instance': TypeError,
+ }),
+ ('vapid://user@example.com/user1', {
+ 'instance': NotifyVapid,
+ # force a failure
+ 'response': False,
+ 'requests_response_code': requests.codes.internal_server_error,
+ }),
+ ('vapid://user@example.com/user1', {
+ 'instance': NotifyVapid,
+ # throw a bizzare code forcing us to fail to look it up
+ 'response': False,
+ 'requests_response_code': 999,
+ }),
+ ('vapid://user@example.com/user1', {
+ 'instance': NotifyVapid,
+ # Throws a series of connection and transfer exceptions
+ # when this flag is set and tests that we gracfully handle them
+ 'test_requests_exceptions': True,
+ }),
+ )
+
+ AppriseURLTester(tests=tests).run_all()
+
+
+@pytest.mark.skipif(
+ 'cryptography' not in sys.modules, reason="Requires cryptography")
+def test_plugin_vapid_subscriptions(tmpdir):
+ """
+ NotifyVapid() Subscriptions
+
+ """
+
+ # Temporary directory
+ tmpdir0 = tmpdir.mkdir('tmp00')
+
+ with pytest.raises(exception.AppriseInvalidData):
+ # Integer not supported
+ WebPushSubscription(42)
+
+ with pytest.raises(exception.AppriseInvalidData):
+ # Not the correct format
+ WebPushSubscription('bad-content')
+
+ with pytest.raises(exception.AppriseInvalidData):
+ # Invalid JSON
+ WebPushSubscription('{')
+
+ with pytest.raises(exception.AppriseInvalidData):
+ # Empty Dictionary
+ WebPushSubscription({})
+
+ with pytest.raises(exception.AppriseInvalidData):
+ WebPushSubscription({
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
+ "keys": {
+ "p256dh": 'BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR=',
+ "auth": 42,
+ },
+ })
+
+ with pytest.raises(exception.AppriseInvalidData):
+ WebPushSubscription({
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
+ "keys": {
+ "p256dh": 42,
+ "auth": 'k9Xzm43nBGo=',
+ },
+ })
+
+ with pytest.raises(exception.AppriseInvalidData):
+ WebPushSubscription({
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
+ })
+
+ with pytest.raises(exception.AppriseInvalidData):
+ WebPushSubscription({
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
+ "keys": {},
+ })
+
+ with pytest.raises(exception.AppriseInvalidData):
+ # Invalid p256dh public key provided
+ wps = WebPushSubscription({
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
+ "keys": {
+ "p256dh": 'BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR=',
+ "auth": 'k9Xzm43nBGo=',
+ },
+ })
+
+ # An empty object
+ wps = WebPushSubscription()
+ assert bool(wps) is False
+ assert isinstance(wps.json(), str)
+ assert json.loads(wps.json())
+ assert str(wps) == ''
+ assert wps.auth is None
+ assert wps.endpoint is None
+ assert wps.p256dh is None
+ assert wps.public_key is None
+ # We can't write anything as there is nothing loaded
+ assert wps.write(os.path.join(str(tmpdir0), 'subscriptions.json')) is False
+
+ # A valid key
+ wps = WebPushSubscription({
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
+ "keys": {
+ "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
+ '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
+ "auth": 'k9Xzm43nBGo=',
+ },
+ })
+
+ assert bool(wps) is True
+ assert isinstance(wps.json(), str)
+ assert json.loads(wps.json())
+ assert str(wps) == 'abc123'
+ assert wps.auth == 'k9Xzm43nBGo='
+ assert wps.endpoint == 'https://fcm.googleapis.com/fcm/send/abc123'
+ assert wps.p256dh == 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' \
+ '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0'
+ assert wps.public_key is not None
+
+ # Currently no files here
+ assert os.listdir(str(tmpdir0)) == []
+
+ # Bad content
+ assert wps.write(object) is False
+ assert wps.write(None) is False
+ # Can't write to a name already taken by as a directory
+ assert wps.write(str(tmpdir0)) is False
+ # Can't write to a name already taken by as a directory
+ assert wps.write(os.path.join(str(tmpdir0), 'subscriptions.json')) is True
+ assert os.listdir(str(tmpdir0)) == ['subscriptions.json']
+
+
+@pytest.mark.skipif(
+ 'cryptography' in sys.modules,
+ reason="Requires that cryptography NOT be installed")
+def test_plugin_vapid_subscriptions_without_c():
+ """
+ NotifyVapid() Subscriptions (no Cryptography)
+
+ """
+ with pytest.raises(exception.AppriseInvalidData):
+ # A valid key that can't be loaded because crytography is missing
+ WebPushSubscription({
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
+ "keys": {
+ "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
+ '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
+ "auth": 'k9Xzm43nBGo=',
+ },
+ })
+
+
+@pytest.mark.skipif(
+ 'cryptography' not in sys.modules, reason="Requires cryptography")
+def test_plugin_vapid_subscription_manager(tmpdir):
+ """
+ NotifyVapid() Subscription Manager
+
+ """
+
+ # Temporary directory
+ tmpdir0 = tmpdir.mkdir('tmp00')
+
+ with pytest.raises(exception.AppriseInvalidData):
+ # An invalid object
+ smgr = WebPushSubscriptionManager()
+ smgr['abc'] = 'invalid'
+
+ with pytest.raises(exception.AppriseInvalidData):
+ # An invalid object
+ smgr = WebPushSubscriptionManager()
+ smgr += 'invalid'
+
+ smgr = WebPushSubscriptionManager()
+
+ assert bool(smgr) is False
+ assert len(smgr) == 0
+
+ sub = {
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
+ "keys": {
+ "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
+ '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
+ "auth": 'k9Xzm43nBGo=',
+ },
+ }
+
+ assert smgr.add(sub) is True
+ assert bool(smgr) is True
+ assert len(smgr) == 1
+
+ # Same sub (overwrites same slot)
+ smgr += sub
+ assert bool(smgr) is True
+ assert len(smgr) == 1
+
+ # This makes a copy
+ smgr['abc'] = smgr['abc123']
+ assert bool(smgr) is True
+ assert len(smgr) == 2
+
+ assert isinstance(smgr['abc123'], WebPushSubscription)
+
+ # Currently no files here
+ assert os.listdir(str(tmpdir0)) == []
+
+ # Write our content
+ assert smgr.write(
+ os.path.join(str(tmpdir0), 'subscriptions.json')) is True
+
+ assert os.listdir(str(tmpdir0)) == ['subscriptions.json']
+
+ # Reset our object
+ smgr.clear()
+ assert bool(smgr) is False
+ assert len(smgr) == 0
+
+ # Load our content back
+ assert smgr.load(
+ os.path.join(str(tmpdir0), 'subscriptions.json')) is True
+ assert bool(smgr) is True
+ assert len(smgr) == 2
+
+ # Write over our file using the standard Subscription format
+ assert smgr['abc123'].write(
+ os.path.join(str(tmpdir0), 'subscriptions.json')) is True
+
+ # We can still open this type as well
+ assert smgr.load(
+ os.path.join(str(tmpdir0), 'subscriptions.json')) is True
+ assert bool(smgr) is True
+ assert len(smgr) == 1
+
+ smgr.clear()
+ bad_entry = {
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
+ "keys": {
+ "p256dh": 'invalid',
+ "auth": 'garbage',
+ },
+ }
+
+ subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json')
+ with open(subscriptions, 'w', encoding='utf-8') as f:
+ # A bad JSON file
+ f.write('{')
+ assert smgr.load(subscriptions) is False
+
+ with open(subscriptions, 'w', encoding='utf-8') as f:
+ # not expected dictionary
+ f.write('null')
+ assert smgr.load(subscriptions) is False
+
+ subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json')
+ with open(subscriptions, 'w', encoding='utf-8') as f:
+ json.dump(bad_entry, f)
+ assert smgr.load(subscriptions) is False
+
+ # Create bad data
+ bad_data = {
+ 'bad1': bad_entry,
+ 'bad2': bad_entry,
+ 'bad3': bad_entry,
+ 'bad4': bad_entry,
+ }
+ subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json')
+ with open(subscriptions, 'w', encoding='utf-8') as f:
+ json.dump(bad_data, f)
+ assert smgr.load(subscriptions) is False
+ assert smgr.load('invalid-file') is False
+
+
+@pytest.mark.skipif(
+ 'cryptography' not in sys.modules, reason="Requires cryptography")
+@mock.patch('requests.post')
+def test_plugin_vapid_initializations(mock_post, tmpdir):
+ """
+ NotifyVapid() Initializations
+
+ """
+
+ # Assign our mock object our return value
+ okay_response = requests.Request()
+ okay_response.status_code = requests.codes.ok
+ okay_response.content = ""
+ mock_post.return_value = okay_response
+
+ # Temporary directory
+ tmpdir0 = tmpdir.mkdir('tmp00')
+
+ # Write our subfile
+ smgr = WebPushSubscriptionManager()
+ sub = {
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
+ "keys": {
+ "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
+ '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
+ "auth": 'k9Xzm43nBGo=',
+ },
+ }
+ subfile = os.path.join(str(tmpdir0), 'subscriptions.json')
+ assert smgr.add(sub) is True
+ assert smgr.add(smgr['abc123']) is True
+ assert os.listdir(str(tmpdir0)) == []
+
+ with mock.patch('json.dump', side_effect=OSError):
+ # We will fial to write
+ assert smgr.write(subfile) is False
+
+ assert smgr.write(subfile) is True
+ assert os.listdir(str(tmpdir0)) == ['subscriptions.json']
+ assert isinstance(smgr.json(), str)
+
+ _asset = asset.AppriseAsset(
+ storage_mode=PersistentStoreMode.FLUSH,
+ storage_path=str(tmpdir0),
+ # Auto-gen our private/public key pair
+ pem_autogen=True,
+ )
+
+ # Auto-Key Generation
+ obj = NotifyVapid(
+ 'user@example.ca', targets=['abc123', ], subfile=subfile,
+ asset=_asset)
+ assert isinstance(obj, NotifyVapid)
+ # Our subscription directory + our
+ # persistent store where our keys were generated
+ assert len(os.listdir(str(tmpdir0))) == 2
+
+ # Second call re-references keys previously generated
+ obj = NotifyVapid(
+ 'user@example.ca', targets=['abc123', ], subfile=subfile,
+ asset=_asset)
+ assert isinstance(obj, NotifyVapid)
+ assert isinstance(obj.url(), str)
+ assert obj.send('test') is True
+ # A second message makes no difference; what is loaded into memory is used
+ assert obj.send('test') is True
+
+ obj = NotifyVapid(
+ 'user@example.ca', targets=['abc123', ], subfile='/a/bad/path',
+ asset=_asset)
+ assert isinstance(obj, NotifyVapid)
+ assert isinstance(obj.url(), str)
+ assert obj.send('test') is False
+ # A second message makes no difference; what is loaded into memory is used
+ assert obj.send('test') is False
+
+ # Detect our keyfile
+ cache_dir = [x for x in os.listdir(str(tmpdir0))
+ if not x.endswith('subscriptions.json')][0]
+
+ # Test fixed assignment to our keyfile
+ keyfile = os.path.join(str(tmpdir0), cache_dir, 'private_key.pem')
+ assert os.path.exists(keyfile)
+ obj = NotifyVapid(
+ 'user@example.ca', targets=['abc123', ], keyfile=keyfile,
+ subfile=subfile, asset=_asset)
+ assert isinstance(obj, NotifyVapid)
+ assert isinstance(obj.url(), str)
+ assert obj.send('test') is True
+ # A second message makes no difference; what is loaded into memory is used
+ assert obj.send('test') is True
+
+ # Invalid Keyfile
+ obj = NotifyVapid(
+ 'user@example.ca', targets=['abc123', ], keyfile=subfile,
+ subfile=subfile, asset=_asset)
+ assert isinstance(obj, NotifyVapid)
+ assert isinstance(obj.url(), str)
+ assert obj.send('test') is False
+ # A second message makes no difference; what is loaded into memory is used
+ assert obj.send('test') is False
+
+ # AutoGen Temporary directory
+ tmpdir1 = tmpdir.mkdir('tmp01')
+ _asset2 = asset.AppriseAsset(
+ storage_mode=PersistentStoreMode.FLUSH,
+ storage_path=str(tmpdir1),
+ # Auto-gen our private/public key pair
+ pem_autogen=True,
+ )
+
+ assert os.listdir(str(tmpdir1)) == []
+ obj = NotifyVapid(
+ 'user@example.ca', targets=['abc123', ], keyfile=keyfile,
+ asset=_asset2)
+ assert isinstance(obj, NotifyVapid)
+ assert isinstance(obj.url(), str)
+ # We have a temporary subscription file we can use
+ assert os.listdir(str(tmpdir1)) == ['00088ad3']
+ # We will have a dud configuration file, but at least it's something
+ # to help the user with
+ assert obj.send('test') is False
+ # Second instance fails as well
+ assert obj.send('test') is False
+
+ # AutoGen Temporary directory
+ tmpdir2 = tmpdir.mkdir('tmp02')
+ _asset3 = asset.AppriseAsset(
+ storage_mode=PersistentStoreMode.FLUSH,
+ storage_path=str(tmpdir2),
+ # Auto-gen our private/public key pair
+ pem_autogen=True,
+ )
+
+ # Test invalid keyfile
+ assert os.path.exists(keyfile)
+ obj = NotifyVapid(
+ 'user@example.ca', targets=['abc123', ], keyfile='invalid-file',
+ subfile=subfile, asset=_asset3)
+ assert isinstance(obj, NotifyVapid)
+ assert isinstance(obj.url(), str)
+ assert obj.send('test') is False
+ # A second message makes no difference; what is loaded into memory is used
+ assert obj.send('test') is False
+
+
+@pytest.mark.skipif(
+ 'cryptography' in sys.modules,
+ reason="Requires that cryptography NOT be installed")
+def test_plugin_vapid_initializations_without_c(tmpdir):
+ """
+ NotifyVapid() Initializations without cryptography
+
+ """
+ # Temporary directory
+ tmpdir0 = tmpdir.mkdir('tmp00')
+
+ # Write our subfile
+ smgr = WebPushSubscriptionManager()
+ sub = {
+ "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
+ "keys": {
+ "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
+ '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
+ "auth": 'k9Xzm43nBGo=',
+ },
+ }
+ subfile = os.path.join(str(tmpdir0), 'subscriptions.json')
+ assert smgr.add(sub) is False
+ _asset = asset.AppriseAsset(
+ storage_mode=PersistentStoreMode.FLUSH,
+ storage_path=str(tmpdir0),
+ # Auto-gen our private/public key pair
+ pem_autogen=True,
+ )
+
+ # Auto-Key Generation
+ obj = NotifyVapid(
+ 'user@example.ca', targets=['abc123', ], subfile=subfile,
+ asset=_asset)
+ assert isinstance(obj, NotifyVapid)
diff --git a/test/test_utils_pem.py b/test/test_utils_pem.py
new file mode 100644
index 00000000..fbf1a824
--- /dev/null
+++ b/test/test_utils_pem.py
@@ -0,0 +1,562 @@
+# -*- coding: utf-8 -*-
+# BSD 2-Clause License
+#
+# Apprise - Push Notification Library.
+# Copyright (c) 2025, Chris Caron
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+import logging
+import os
+import sys
+import pytest
+from unittest import mock
+
+from apprise import AppriseAsset
+from apprise import PersistentStoreMode
+from apprise import utils
+
+# Disable logging for a cleaner testing output
+logging.disable(logging.CRITICAL)
+
+# Attachment Directory
+TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
+
+
+@pytest.mark.skipif(
+ 'cryptography' not in sys.modules, reason="Requires cryptography")
+def test_utils_pem_general(tmpdir):
+ """
+ Utils:PEM
+
+ """
+
+ # string to manipulate/work with
+ unencrypted_str = "message"
+
+ tmpdir0 = tmpdir.mkdir('tmp00')
+
+ # Currently no files here
+ assert os.listdir(str(tmpdir0)) == []
+
+ asset = AppriseAsset(
+ storage_mode=PersistentStoreMode.MEMORY,
+ storage_path=str(tmpdir0),
+ pem_autogen=False,
+ )
+
+ # Create a PEM Controller
+ pem_c = utils.pem.ApprisePEMController(path=None, asset=asset)
+
+ # Nothing to lookup
+ assert pem_c.public_keyfile() is None
+ assert pem_c.public_key() is None
+ assert pem_c.x962_str == ''
+ assert pem_c.decrypt(b'data') is None
+ assert pem_c.encrypt(unencrypted_str) is None
+ # Keys can not be generated in memory mode
+ assert pem_c.keygen() is False
+ assert pem_c.sign(b'data') is None
+
+ asset = AppriseAsset(
+ storage_mode=PersistentStoreMode.FLUSH,
+ storage_path=str(tmpdir0),
+ pem_autogen=False,
+ )
+
+ # No new files
+ assert os.listdir(str(tmpdir0)) == []
+
+ # Our asset is now write mode, so we will be able to generate a key
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
+ # Nothing to lookup
+ assert pem_c.public_keyfile() is None
+ assert pem_c.public_key() is None
+ assert pem_c.x962_str == ''
+ assert pem_c.encrypt(unencrypted_str) is None
+
+ # Generate our keys
+ assert bool(pem_c) is False
+ assert pem_c.keygen() is True
+ assert bool(pem_c) is True
+
+ # We have 2 new key files generated
+ pub_keyfile = os.path.join(str(tmpdir0), 'public_key.pem')
+ prv_keyfile = os.path.join(str(tmpdir0), 'private_key.pem')
+ assert os.path.isfile(pub_keyfile)
+ assert os.path.isfile(prv_keyfile)
+ assert pem_c.public_keyfile() is not None
+ assert pem_c.decrypt("garbage") is None
+ assert pem_c.public_key() is not None
+
+ # Keys used later on as ref
+ pubkey_ref = pem_c.public_key()
+ prvkey_ref = pem_c.private_key()
+
+ assert isinstance(pem_c.x962_str, str)
+ assert len(pem_c.x962_str) > 20
+ content = pem_c.encrypt(unencrypted_str)
+ assert pem_c.decrypt(pem_c.encrypt(unencrypted_str.encode('utf-8'))) \
+ == pem_c.decrypt(pem_c.encrypt(unencrypted_str))
+ assert pem_c.decrypt(
+ pem_c.encrypt(unencrypted_str, public_key=pem_c.public_key())) \
+ == pem_c.decrypt(pem_c.encrypt(unencrypted_str))
+ assert pem_c.decrypt(content) == unencrypted_str
+ assert isinstance(content, str)
+ assert pem_c.decrypt(content) == unencrypted_str
+ # support str as well
+ assert pem_c.decrypt(content) == unencrypted_str
+ assert pem_c.decrypt(content.encode('utf-8')) == unencrypted_str
+ # Sign test
+ assert isinstance(pem_c.sign(content.encode('utf-8')), bytes)
+
+ # Web Push handling
+ webpush_content = pem_c.encrypt_webpush(
+ unencrypted_str,
+ public_key=pem_c.public_key(),
+ auth_secret=b'secret')
+ assert isinstance(webpush_content, bytes)
+
+ webpush_content = pem_c.encrypt_webpush(
+ unencrypted_str.encode('utf-8'),
+ public_key=pem_c.public_key(),
+ auth_secret=b'secret')
+ assert isinstance(webpush_content, bytes)
+
+ # Non Bytes (garbage basically)
+ with pytest.raises(TypeError):
+ assert pem_c.decrypt(None) is None
+
+ with pytest.raises(TypeError):
+ assert pem_c.decrypt(5) is None
+
+ with pytest.raises(TypeError):
+ assert pem_c.decrypt(False) is None
+
+ with pytest.raises(TypeError):
+ assert pem_c.decrypt(object) is None
+
+ # Test our initialization
+ pem_c = utils.pem.ApprisePEMController(
+ path=None,
+ prv_keyfile='invalid',
+ asset=asset)
+ assert pem_c.private_keyfile() is False
+ assert pem_c.public_keyfile() is None
+ assert pem_c.prv_keyfile is False
+ assert pem_c.pub_keyfile is None
+ assert pem_c.private_key() is None
+ assert pem_c.public_key() is None
+ assert pem_c.decrypt(content) is None
+
+ pem_c = utils.pem.ApprisePEMController(
+ path=None,
+ pub_keyfile='invalid',
+ asset=asset)
+ assert pem_c.private_keyfile() is None
+ assert pem_c.public_keyfile() is False
+ assert pem_c.prv_keyfile is None
+ assert pem_c.pub_keyfile is False
+ assert pem_c.private_key() is None
+ assert pem_c.public_key() is None
+ assert pem_c.decrypt(content) is None
+
+ pem_c = utils.pem.ApprisePEMController(
+ path=None,
+ prv_keyfile=prv_keyfile,
+ asset=asset)
+ assert pem_c.private_keyfile() == prv_keyfile
+ assert pem_c.public_keyfile() is None
+ assert pem_c.private_key() is not None
+ assert pem_c.prv_keyfile == prv_keyfile
+ assert pem_c.pub_keyfile is None
+ assert pem_c.public_key() is not None
+ assert pem_c.decrypt(content) == unencrypted_str
+
+ pem_c = utils.pem.ApprisePEMController(
+ path=None,
+ pub_keyfile=pub_keyfile,
+ asset=asset)
+ assert pem_c.private_keyfile() is None
+ assert pem_c.public_keyfile() == pub_keyfile
+ assert pem_c.prv_keyfile is None
+ assert pem_c.pub_keyfile == pub_keyfile
+ assert pem_c.private_key() is None
+ assert pem_c.public_key() is not None
+ assert pem_c.decrypt(content) is None
+
+ # Test our path references
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
+ assert pem_c.load_private_key(path=None) is True
+ assert pem_c.private_keyfile() == prv_keyfile
+ assert pem_c.prv_keyfile is None
+ assert pem_c.pub_keyfile is None
+ assert pem_c.decrypt(content) == unencrypted_str
+
+ # Generate a new key referencing another location
+ pem_c = utils.pem.ApprisePEMController(
+ name='keygen-tests',
+ path=str(tmpdir0), asset=asset)
+
+ # generate ourselves some keys
+ assert pem_c.keygen() is True
+ keygen_prv_file = pem_c.prv_keyfile
+ keygen_pub_file = pem_c.pub_keyfile
+
+ # Remove 1 (but not both)
+ os.unlink(keygen_pub_file)
+
+ pem_c = utils.pem.ApprisePEMController(
+ name='keygen-tests',
+ path=str(tmpdir0), asset=asset)
+ # Private key was found, so this does not work
+ assert pem_c.keygen() is False
+ os.unlink(keygen_prv_file)
+
+ pem_c = utils.pem.ApprisePEMController(
+ name='keygen-tests',
+ path=str(tmpdir0), asset=asset)
+ # It works now
+ assert pem_c.keygen() is True
+
+ # Tests public_key generation failure only
+ with mock.patch('builtins.open', side_effect=OSError()):
+ assert pem_c.keygen(force=True) is False
+ with mock.patch('os.unlink', side_effect=OSError()):
+ assert pem_c.keygen(force=True) is False
+ with mock.patch('os.unlink', return_value=True):
+ assert pem_c.keygen(force=True) is False
+
+ # Tests private key generation
+ side_effect = [
+ mock.mock_open(read_data="file contents").return_value] + \
+ [OSError() for _ in range(10)]
+ with mock.patch('builtins.open', side_effect=side_effect):
+ assert pem_c.keygen(force=True) is False
+ with mock.patch('builtins.open', side_effect=side_effect):
+ with mock.patch('os.unlink', side_effect=OSError()):
+ assert pem_c.keygen(force=True) is False
+ with mock.patch('builtins.open', side_effect=side_effect):
+ with mock.patch('os.unlink', return_value=True):
+ assert pem_c.keygen(force=True) is False
+
+ # Generate a new key referencing another location
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
+ # We can't re-generate keys if ones already exist
+ assert pem_c.keygen() is False
+ # the keygen is the big difference here
+ assert pem_c.keygen(name='test') is True
+ # under the hood, a key is not regenerated (as one already exists)
+ assert pem_c.keygen(name='test') is False
+ # Generate it a second time by force
+ assert pem_c.keygen(name='test', force=True) is True
+
+ assert pem_c.private_keyfile() == os.path.join(
+ str(tmpdir0), 'test-private_key.pem')
+ assert pem_c.public_keyfile() == os.path.join(
+ str(tmpdir0), 'test-public_key.pem')
+ assert pem_c.private_key() is not None
+ assert pem_c.public_key() is not None
+ assert pem_c.prv_keyfile == os.path.join(
+ str(tmpdir0), 'test-private_key.pem')
+ assert pem_c.pub_keyfile == os.path.join(
+ str(tmpdir0), 'test-public_key.pem')
+ # 'content' was generated using a different key and can not be
+ # decrypted
+ assert pem_c.decrypt(content) is None
+
+ # Test Decryption files
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
+ # Calling decrypt triggers underlining code to auto-load
+ assert pem_c.decrypt(content) == unencrypted_str
+ # Using a private key by path
+ assert pem_c.decrypt(
+ content, private_key=pem_c.private_key()) == unencrypted_str
+
+ # Test different edge cases of load_private_key()
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
+ assert pem_c.load_private_key() is True
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
+ assert pem_c.load_private_key(path=prv_keyfile) is True
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
+ with mock.patch('builtins.open', side_effect=TypeError()):
+ assert pem_c.load_private_key(path=prv_keyfile) is False
+ with mock.patch('builtins.open', side_effect=OSError()):
+ assert pem_c.load_private_key(path=prv_keyfile) is False
+ with mock.patch('builtins.open', side_effect=FileNotFoundError()):
+ assert pem_c.load_private_key(path=prv_keyfile) is False
+
+ # Test different edge cases of load_public_key()
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
+ assert pem_c.load_public_key() is True
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
+ assert pem_c.load_public_key(path=pub_keyfile) is True
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
+ with mock.patch('builtins.open', side_effect=TypeError()):
+ assert pem_c.load_public_key(path=pub_keyfile) is False
+ with mock.patch('builtins.open', side_effect=OSError()):
+ assert pem_c.load_public_key(path=pub_keyfile) is False
+ with mock.patch('builtins.open', side_effect=FileNotFoundError()):
+ assert pem_c.load_public_key(path=pub_keyfile) is False
+
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
+ assert pem_c.public_keyfile('test1', 'test2') == pub_keyfile
+ assert pem_c.private_keyfile('test1', 'test2') == prv_keyfile
+
+ pem_c = utils.pem.ApprisePEMController(
+ path=str(tmpdir0), name='pub1', asset=asset)
+ assert pem_c.public_key(autogen=True)
+
+ pem_c = utils.pem.ApprisePEMController(
+ path=str(tmpdir0), name='pub2', asset=asset)
+ assert pem_c.private_key(autogen=True)
+
+ #
+ # Auto key generation turned on
+ #
+ asset = AppriseAsset(
+ storage_mode=PersistentStoreMode.MEMORY,
+ storage_path=str(tmpdir0),
+ pem_autogen=True,
+ )
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
+ assert pem_c.load_public_key(path=pub_keyfile) is True
+ pem_c = utils.pem.ApprisePEMController(path=None, asset=asset)
+ assert pem_c.load_public_key(path=pub_keyfile) is True
+
+ tmpdir1 = tmpdir.mkdir('tmp01')
+
+ # Currently no files here
+ assert os.listdir(str(tmpdir1)) == []
+
+ asset = AppriseAsset(
+ storage_mode=PersistentStoreMode.MEMORY,
+ storage_path=str(tmpdir1),
+ pem_autogen=False,
+ )
+
+ # Auto-Gen is turned off, so weare not successful here
+ pem_c = utils.pem.ApprisePEMController(path=None, asset=asset)
+ assert pem_c.public_key() is None
+ assert pem_c.private_key() is None
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir1), asset=asset)
+ assert pem_c.public_key() is None
+ assert pem_c.private_key() is None
+
+ asset = AppriseAsset(
+ storage_mode=PersistentStoreMode.FLUSH,
+ storage_path=str(tmpdir1),
+ pem_autogen=True,
+ )
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir1), asset=asset)
+ # Generate ourselves a private key
+ assert pem_c.public_key() is not None
+ assert pem_c.private_key() is not None
+ pub_keyfile = os.path.join(str(tmpdir1), 'public_key.pem')
+ prv_keyfile = os.path.join(str(tmpdir1), 'private_key.pem')
+ assert os.path.isfile(pub_keyfile)
+ assert os.path.isfile(prv_keyfile)
+
+ with open(pub_keyfile, 'w') as f:
+ f.write('garbage')
+
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir1), asset=asset)
+ # we can still load our data as the public key is generated
+ # from the private
+ assert pem_c.public_key() is not None
+ assert pem_c.private_key() is not None
+
+ tmpdir2 = tmpdir.mkdir('tmp02')
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir2), asset=asset)
+ pub_keyfile = os.path.join(str(tmpdir2), 'public_key.pem')
+ prv_keyfile = os.path.join(str(tmpdir2), 'private_key.pem')
+ assert not os.path.isfile(pub_keyfile)
+ assert not os.path.isfile(prv_keyfile)
+
+ #
+ # Public Key Edge Case Tests
+ #
+ with \
+ mock.patch.object(
+ pem_c, 'public_keyfile', side_effect=[None, pub_keyfile]) \
+ as mock_keyfile, \
+ mock.patch.object(
+ pem_c, 'keygen', side_effect=lambda *_, **__: setattr(
+ pem_c, '_ApprisePEMController__public_key',
+ pubkey_ref) or True) as mock_keygen, \
+ mock.patch.object(
+ pem_c, 'load_public_key', return_value=True):
+
+ result = pem_c.public_key()
+ assert result is pubkey_ref
+ assert mock_keyfile.call_count == 2
+ mock_keygen.assert_called_once()
+
+ # - First call: None → triggers keygen
+ # - Second call (recursive): None → causes fallback
+ public_keyfile_side_effect = [None, None]
+
+ with mock.patch.object(
+ pem_c, 'public_keyfile', side_effect=public_keyfile_side_effect) \
+ as mock_keyfile, \
+ mock.patch.object(pem_c, 'keygen', return_value=True) \
+ as mock_keygen, \
+ mock.patch.object(pem_c, 'load_public_key', return_value=False) \
+ as mock_load:
+
+ # Ensure no key is preset initially
+ setattr(pem_c, '_ApprisePEMController__public_key', None)
+
+ result = pem_c.public_key()
+ assert result is None
+ # Once in outer call, once in recursive
+ assert mock_keyfile.call_count == 2
+ mock_keygen.assert_called_once()
+ mock_load.assert_not_called()
+
+ #
+ # Private Key Edge Case Tests
+ #
+ with \
+ mock.patch.object(
+ pem_c, 'private_keyfile', side_effect=[None, prv_keyfile]) \
+ as mock_keyfile, \
+ mock.patch.object(
+ pem_c, 'keygen', side_effect=lambda *_, **__: setattr(
+ pem_c, '_ApprisePEMController__private_key',
+ prvkey_ref) or True) as mock_keygen, \
+ mock.patch.object(
+ pem_c, 'load_private_key', return_value=True):
+
+ result = pem_c.private_key()
+ assert result is prvkey_ref
+ assert mock_keyfile.call_count == 2
+ mock_keygen.assert_called_once()
+
+ # - First call: None → triggers keygen
+ # - Second call (recursive): None → causes fallback
+ private_keyfile_side_effect = [None, None]
+
+ with mock.patch.object(
+ pem_c, 'private_keyfile',
+ side_effect=private_keyfile_side_effect) as mock_keyfile, \
+ mock.patch.object(pem_c, 'keygen', return_value=True) \
+ as mock_keygen, \
+ mock.patch.object(pem_c, 'load_private_key', return_value=False) \
+ as mock_load:
+
+ # Ensure no key is preset initially
+ setattr(pem_c, '_ApprisePEMController__private_key', None)
+
+ result = pem_c.private_key()
+ assert result is None
+ # Once in outer call, once in recursive
+ assert mock_keyfile.call_count == 2
+ mock_keygen.assert_called_once()
+ mock_load.assert_not_called()
+
+
+@pytest.mark.skipif(
+ 'cryptography' in sys.modules,
+ reason="Requires that cryptography NOT be installed")
+def test_utils_pem_general_without_c(tmpdir):
+ """
+ Utils:PEM Without cryptography
+
+ """
+
+ tmpdir0 = tmpdir.mkdir('tmp00')
+
+ # Currently no files here
+ assert os.listdir(str(tmpdir0)) == []
+
+ asset = AppriseAsset(
+ storage_mode=PersistentStoreMode.MEMORY,
+ storage_path=str(tmpdir0),
+ pem_autogen=False,
+ )
+
+ # Create a PEM Controller
+ pem_c = utils.pem.ApprisePEMController(path=None, asset=asset)
+
+ # cryptography library missing poses issues with library useage
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.public_keyfile()
+
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.public_key()
+
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.x962_str
+
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.encrypt("message")
+
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.keygen()
+
+ asset = AppriseAsset(
+ storage_mode=PersistentStoreMode.FLUSH,
+ storage_path=str(tmpdir0),
+ pem_autogen=False,
+ )
+
+ # No new files
+ assert os.listdir(str(tmpdir0)) == []
+
+ # Our asset is now write mode, so we will be able to generate a key
+ pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
+ # Nothing to lookup
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.private_keyfile()
+
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.private_key()
+
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.x962_str
+
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.encrypt("message")
+
+ # Keys can not be generated in memory mode
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.keygen()
+
+ # No files loaded
+ assert os.listdir(str(tmpdir0)) == []
+
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.public_keyfile()
+
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.public_key()
+
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.x962_str
+
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.encrypt("message")
+
+ with pytest.raises(utils.pem.ApprisePEMException):
+ pem_c.decrypt("abcd==")