Vapid/WebPush Support (#1323)

pull/1358/head
Chris Caron 2025-06-30 04:00:41 +02:00 committed by GitHub
parent ce90151051
commit b2e4b921ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 3193 additions and 4 deletions

View File

@ -15,4 +15,3 @@ source =
show_missing = True
skip_covered = True
skip_empty = True
fail_under = 95.0

View File

@ -109,9 +109,11 @@ Threema Gateway
Twilio
Twist
Twitter
Vapid
VictorOps
Voipms
Vonage
Webpush
Webex
WeCom Bot
WhatsApp

View File

@ -132,6 +132,7 @@ The table below identifies the services this tool supports and some example serv
| [Telegram](https://github.com/caronc/apprise/wiki/Notify_telegram) | tgram:// | (TCP) 443 | tgram://bottoken/ChatID<br />tgram://bottoken/ChatID1/ChatID2/ChatIDN
| [Twitter](https://github.com/caronc/apprise/wiki/Notify_twitter) | twitter:// | (TCP) 443 | twitter://CKey/CSecret/AKey/ASecret<br/>twitter://user@CKey/CSecret/AKey/ASecret<br/>twitter://CKey/CSecret/AKey/ASecret/User1/User2/User2<br/>twitter://CKey/CSecret/AKey/ASecret?mode=tweet
| [Twist](https://github.com/caronc/apprise/wiki/Notify_twist) | twist:// | (TCP) 443 | twist://pasword:login<br/>twist://password:login/#channel<br/>twist://password:login/#team:channel<br/>twist://password:login/#team:channel1/channel2/#team3:channel
| [Vapid (WebPush)](https://github.com/caronc/apprise/wiki/Notify_vapid) | vapid:// | (TCP) 443 | vapid://subscriber/target<br/>vapid://subscriber/target?subfile=path&keyfile=path
| [Webex Teams (Cisco)](https://github.com/caronc/apprise/wiki/Notify_wxteams) | wxteams:// | (TCP) 443 | wxteams://Token
| [WeCom Bot](https://github.com/caronc/apprise/wiki/Notify_wecombot) | wecombot:// | (TCP) 443 | wecombot://BotKey
| [WhatsApp](https://github.com/caronc/apprise/wiki/Notify_whatsapp) | whatsapp:// | (TCP) 443 | whatsapp://AccessToken@FromPhoneID/ToPhoneNo<br/>whatsapp://Template:AccessToken@FromPhoneID/ToPhoneNo

View File

@ -273,6 +273,17 @@ class AppriseAttachment:
return attach_plugin
def sync(self, abort_on_error=True, abort_if_empty=True):
"""
Itereates over all of the attachments and retrieves them
"""
# TODO: Change this to async for future
return False if abort_if_empty and not self.attachments else (
next((False for a in self.attachments if not a), True)
if abort_on_error
else next((True for a in self.attachments), True))
def clear(self):
"""
Empties our attachment list

View File

@ -149,6 +149,12 @@ class AppriseAsset:
# if Persistent Storage was set to `memory`
pgp_autogen = True
# Automatically generate our Privacy Enhanced Mail (PEM) keys if one isn't
# present and our environment configuration allows for it.
# For example, a case where the environment wouldn't allow for it would be
# if Persistent Storage was set to `memory`
pem_autogen = True
# For more detail see CWE-312 @
# https://cwe.mitre.org/data/definitions/312.html
#

View File

@ -53,6 +53,14 @@ class AppriseDiskIOError(AppriseException):
super().__init__(message, error_code=error_code)
class AppriseInvalidData(AppriseException):
"""
Thrown when bad data was passed into an internal function
"""
def __init__(self, message, error_code=errno.EINVAL):
super().__init__(message, error_code=error_code)
class AppriseFileNotFound(AppriseDiskIOError, FileNotFoundError):
"""
Thrown when a persistent write occured in MEMORY mode

View File

@ -0,0 +1,590 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os
import requests
from itertools import chain
from json import dumps
from . import subscription
from ..base import NotifyBase
from ...common import NotifyType
from ...common import NotifyImageSize
from ...common import PersistentStoreMode
from ...utils.parse import parse_list, parse_bool, is_email
from ...utils.base64 import base64_urlencode
from ...utils import pem as _pem
from ...locale import gettext_lazy as _
import time
class VapidPushMode:
"""
Supported Vapid Push Services
"""
CHROME = 'chrome'
FIREFOX = 'firefox'
EDGE = 'edge'
OPERA = 'opera'
APPLE = 'apple'
SAMSUNG = 'samsung'
BRAVE = 'brave'
GENERIC = 'generic'
VAPID_API_LOOKUP = {
VapidPushMode.CHROME:
'https://fcm.googleapis.com/fcm/send',
VapidPushMode.FIREFOX:
'https://updates.push.services.mozilla.com/wpush/v1',
VapidPushMode.EDGE:
'https://fcm.googleapis.com/fcm/send', # Edge uses FCM too
VapidPushMode.OPERA:
'https://fcm.googleapis.com/fcm/send', # Opera is Chromium-based
VapidPushMode.APPLE:
'https://web.push.apple.com', # Apple Web Push base endpoint
VapidPushMode.BRAVE:
'https://fcm.googleapis.com/fcm/send',
VapidPushMode.SAMSUNG:
'https://fcm.googleapis.com/fcm/send',
VapidPushMode.GENERIC:
'https://fcm.googleapis.com/fcm/send',
}
VAPID_PUSH_MODES = (
VapidPushMode.CHROME,
VapidPushMode.FIREFOX,
VapidPushMode.EDGE,
VapidPushMode.OPERA,
VapidPushMode.APPLE,
)
class NotifyVapid(NotifyBase):
"""
A wrapper for WebPush/Vapid notifications
"""
# Set our global enabled flag
enabled = subscription.CRYPTOGRAPHY_SUPPORT and _pem.PEM_SUPPORT
requirements = {
# Define our required packaging in order to work
'packages_required': 'cryptography'
}
# The default descriptive name associated with the Notification
service_name = 'Vapid Web Push Notifications'
# The services URL
service_url = \
'https://datatracker.ietf.org/doc/html/draft-thomson-webpush-vapid'
# The default protocol
secure_protocol = 'vapid'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_vapid'
# There is no reason we should exceed 5KB when reading in a PEM file.
# If it is more than this, then it is not accepted.
max_vapid_keyfile_size = 5000
# There is no reason we should exceed 5MB when reading in a JSON file.
# If it is more than this, then it is not accepted.
max_vapid_subfile_size = 5242880
# The maximum length of the messge can be 4096
# just choosing a safe number below this to allow for padding and
# encryption
body_maxlen = 4000
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# Our default is to no not use persistent storage beyond in-memory
# reference; this allows us to auto-generate our config if needed
storage_mode = PersistentStoreMode.AUTO
# 43200 = 12 hours
vapid_jwt_expiration_sec = 43200
# Subscription file
vapid_subscription_file = 'subscriptions.json'
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_72
# Define object templates
templates = (
'{schema}://{subscriber}',
'{schema}://{subscriber}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'subscriber': {
'name': _('API Key'),
'type': 'string',
'private': True,
'required': True,
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
# Define our template args
template_args = dict(NotifyBase.template_tokens, **{
'mode': {
'name': _('Mode'),
'type': 'choice:string',
'values': VAPID_PUSH_MODES,
'default': VAPID_PUSH_MODES[0],
'map_to': 'mode',
},
# Default Time To Live (defined in seconds)
# 0 (Zero) - message will be delivered only if the device is reacheable
'ttl': {
'name': _('ttl'),
'type': 'int',
'default': 0,
'min': 0,
'max': 60,
},
'to': {
'alias_of': 'targets',
},
'from': {
'alias_of': 'subscriber',
},
'keyfile': {
# A Private Keyfile is required to sign header
'name': _('PEM Private KeyFile'),
'type': 'string',
'private': True,
},
'subfile': {
# A Subscripion File is required to sign header
'name': _('Subscripion File'),
'type': 'string',
'private': True,
},
'image': {
'name': _('Include Image'),
'type': 'bool',
'default': True,
'map_to': 'include_image',
},
})
def __init__(self, subscriber, mode=None, targets=None, keyfile=None,
subfile=None, include_image=None, ttl=None, **kwargs):
"""
Initialize Vapid Messaging
"""
super().__init__(**kwargs)
# Path to our Private Key file
self.keyfile = None
# Path to our subscription.json file
self.subfile = None
#
# Our Targets
#
self.targets = []
self._invalid_targets = []
# default subscriptions
self.subscriptions = {}
self.subscriptions_loaded = False
self.private_key_loaded = False
# Set our Time to Live Flag
self.ttl = self.template_args['ttl']['default']
if ttl is not None:
try:
self.ttl = int(ttl)
except (ValueError, TypeError):
# Do nothing
pass
if self.ttl < self.template_args['ttl']['min'] or \
self.ttl > self.template_args['ttl']['max']:
msg = 'The Vapid TTL specified ({}) is out of range.'\
.format(self.ttl)
self.logger.warning(msg)
raise TypeError(msg)
# Place a thumbnail image inline with the message body
self.include_image = \
self.template_args['image']['default'] \
if include_image is None else include_image
result = is_email(subscriber)
if not result:
msg = 'An invalid Vapid Subscriber' \
'({}) was specified.'.format(subscriber)
self.logger.warning(msg)
raise TypeError(msg)
self.subscriber = result['full_email']
# Store our Mode/service
try:
self.mode = \
NotifyVapid.template_args['mode']['default'] \
if mode is None else mode.lower()
if self.mode not in VAPID_PUSH_MODES:
# allow the outer except to handle this common response
raise
except:
# Invalid region specified
msg = 'The Vapid mode specified ({}) is invalid.' \
.format(mode)
self.logger.warning(msg)
raise TypeError(msg)
# Our Private keyfile
self.keyfile = keyfile
# Our Subscription file
self.subfile = subfile
# Prepare our PEM Object
self.pem = _pem.ApprisePEMController(self.store.path, asset=self.asset)
# Create our subscription object
self.subscriptions = subscription.WebPushSubscriptionManager(
asset=self.asset)
if self.subfile is None and \
self.store.mode != PersistentStoreMode.MEMORY and \
self.asset.pem_autogen:
self.subfile = os.path.join(
self.store.path, self.vapid_subscription_file)
if not os.path.exists(self.subfile) and \
self.subscriptions.write(self.subfile):
self.logger.info(
'Vapid auto-generated %s/%s',
os.path.basename(self.store.path),
self.vapid_subscription_file)
# Acquire our targets for parsing
self.targets = parse_list(targets)
if not self.targets:
# Add ourselves
self.targets.append(self.subscriber)
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Vapid Notification
"""
if not self.private_key_loaded and ((
self.keyfile and not self.pem.private_key(
autogen=False, autodetect=False)
and not self.pem.load_private_key(self.keyfile))
or (not self.keyfile and not self.pem)):
self.logger.warning(
'Provided Vapid/WebPush (PEM) Private Key file could '
'not be loaded.')
self.private_key_loaded = True
return False
else:
self.private_key_loaded = True
if not self.targets:
# There is no one to notify; we're done
self.logger.warning('There are no Vapid targets to notify')
return False
if not self.subscriptions_loaded and self.subfile:
# Toggle our loaded flag to prevent trying again later
self.subscriptions_loaded = True
if not self.subscriptions.load(
self.subfile, byte_limit=self.max_vapid_subfile_size):
self.logger.warning(
'Provided Vapid/WebPush subscriptions file could not be '
'loaded.')
return False
if not self.subscriptions:
self.logger.warning('Vapid could not load subscriptions')
return False
if not self.pem.private_key(autogen=False, autodetect=False):
self.logger.warning(
'No Vapid/WebPush (PEM) Private Key file could be loaded.')
return False
# Prepare our notify URL (based on our mode)
notify_url = VAPID_API_LOOKUP[self.mode]
headers = {
'User-Agent': self.app_id,
"TTL": str(self.ttl),
"Content-Encoding": "aes128gcm",
"Content-Type": "application/octet-stream",
"Authorization": f"vapid t={self.jwt_token}, k={self.public_key}",
}
has_error = False
# Create a copy of the targets list
targets = list(self.targets)
while len(targets):
target = targets.pop(0)
if target not in self.subscriptions:
self.logger.warning(
'Dropped Vapid user '
'(%s) specified - not found in subscriptions.json.' %
target,
)
# Save ourselves from doing this again
self._invalid_targets.append(target)
self.targets.remove(target)
has_error = True
continue
# Encrypt our payload
encrypted_payload = self.pem.encrypt_webpush(
body,
public_key=self.subscriptions[target].public_key,
auth_secret=self.subscriptions[target].auth_secret)
self.logger.debug(
'Vapid %s POST URL: %s (cert_verify=%r)',
self.mode, notify_url, self.verify_certificate,
)
self.logger.debug(
'Vapid %s Encrypted Payload: %d byte(s)', self.mode, len(body))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
notify_url,
data=encrypted_payload,
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code not in (
requests.codes.ok, requests.codes.no_content):
# We had a problem
status_str = \
NotifyBase.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send {} Vapid notification: '
'{}{}error={}.'.format(
self.mode,
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n%s', r.content)
has_error = True
else:
self.logger.info('Sent %s Vapid notification.', self.mode)
except requests.RequestException as e:
self.logger.warning(
'A Connection error occurred sending Vapid '
'notification.'
)
self.logger.debug('Socket Exception: %s', str(e))
has_error = True
return not has_error
@property
def url_identifier(self):
"""
Returns all of the identifiers that make this URL unique from
another simliar one. Targets or end points should never be identified
here.
"""
return (self.secure_protocol, self.mode, self.subscriber)
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any URL parameters
params = {
'mode': self.mode,
'ttl': str(self.ttl),
}
if self.keyfile:
# Include our keyfile if specified
params['keyfile'] = self.keyfile
if self.subfile:
# Include our subfile if specified
params['subfile'] = self.subfile
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
targets = self.targets if not (
self.targets == 1 and
self.targets[0].lower() == self.subscriber.lower()) else []
return '{schema}://{subscriber}/{targets}?{params}'.format(
schema=self.secure_protocol,
subscriber=NotifyVapid.quote(self.subscriber, safe='@'),
targets='/'.join(chain(
[str(t) for t in targets],
[NotifyVapid.quote(x, safe='@')
for x in self._invalid_targets])),
params=NotifyVapid.urlencode(params),
)
def __len__(self):
"""
Returns the number of targets associated with this notification
"""
targets = len(self.targets)
return targets if targets else 1
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# Prepare our targets
results['targets'] = []
if 'from' in results['qsd'] and len(results['qsd']['from']):
results['subscriber'] = \
NotifyVapid.unquote(results['qsd']['from'])
if results['user'] and results['host']:
# whatever is left on the URL goes
results['targets'].append('{}@{}'.format(
NotifyVapid.unquote(results['user']),
NotifyVapid.unquote(results['host']),
))
elif results['host']:
results['targets'].append(
NotifyVapid.unquote(results['host']))
else:
# Acquire our subscriber information
results['subscriber'] = '{}@{}'.format(
NotifyVapid.unquote(results['user']),
NotifyVapid.unquote(results['host']),
)
results['targets'].extend(
NotifyVapid.split_path(results['fullpath']))
# Get our mode
results['mode'] = results['qsd'].get('mode')
# Get Image Flag
results['include_image'] = \
parse_bool(results['qsd'].get(
'image', NotifyVapid.template_args['image']['default']))
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyVapid.parse_list(results['qsd']['to'])
# Our Private Keyfile (PEM)
if 'keyfile' in results['qsd'] and results['qsd']['keyfile']:
results['keyfile'] = \
NotifyVapid.unquote(results['qsd']['keyfile'])
# Our Subscription File (JSON)
if 'subfile' in results['qsd'] and results['qsd']['subfile']:
results['subfile'] = \
NotifyVapid.unquote(results['qsd']['subfile'])
# Support the 'ttl' variable
if 'ttl' in results['qsd'] and len(results['qsd']['ttl']):
results['ttl'] = \
NotifyVapid.unquote(results['qsd']['ttl'])
return results
@property
def jwt_token(self):
"""
Returns our VAPID Token based on class details
"""
# JWT header
header = {
"alg": "ES256",
"typ": "JWT"
}
# JWT payload
payload = {
"aud": VAPID_API_LOOKUP[self.mode],
"exp": int(time.time()) + self.vapid_jwt_expiration_sec,
"sub": f"mailto:{self.subscriber}"
}
# Base64 URL encode header and payload
header_b64 = base64_urlencode(
dumps(header, separators=(",", ":")).encode('utf-8'))
payload_b64 = base64_urlencode(
dumps(payload, separators=(",", ":")).encode('utf-8'))
signing_input = f"{header_b64}.{payload_b64}".encode('utf-8')
signature_b64 = base64_urlencode(self.pem.sign(signing_input))
# Return final token
return f"{header_b64}.{payload_b64}.{signature_b64}"
@property
def public_key(self):
"""
Returns our public key representation
"""
return self.pem.x962_str

View File

@ -0,0 +1,431 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import json
from typing import Optional, Union
from ...asset import AppriseAsset
from ...utils.base64 import base64_urldecode
from ...exception import AppriseInvalidData
from ...apprise_attachment import AppriseAttachment
try:
from cryptography.hazmat.primitives.asymmetric import ec
# Cryptography Support enabled
CRYPTOGRAPHY_SUPPORT = True
except ImportError:
# Cryptography Support disabled
CRYPTOGRAPHY_SUPPORT = False
class WebPushSubscription:
"""
WebPush Subscription
"""
# Format:
# {
# "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
# "keys": {
# "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
# "auth": "k9Xzm43nBGo=",
# }
# }
def __init__(self, content: Union[str, dict, None] = None) -> None:
"""
Prepares a webpush object provided with content
Content can be a dictionary, or JSON String
"""
# Our variables
self.__endpoint = None
self.__p256dh = None
self.__auth = None
self.__auth_secret = None
self.__public_key = None
if content is not None:
if not self.load(content):
raise AppriseInvalidData('Could not load subscription')
def load(self, content: Union[str, dict, None] = None) -> bool:
"""
Performs the loading/validation of the object
"""
# Reset our variables
self.__endpoint = None
self.__p256dh = None
self.__auth = None
self.__auth_secret = None
self.__public_key = None
if not CRYPTOGRAPHY_SUPPORT:
return False
if isinstance(content, str):
try:
content = json.loads(content)
except (json.decoder.JSONDecodeError, TypeError, OSError):
# Bad data
return False
if not isinstance(content, dict):
# We could not load he result set
return False
# Retreive our contents for validation
endpoint = content.get('endpoint')
if not isinstance(endpoint, str):
return False
try:
p256dh = base64_urldecode(content['keys']['p256dh'])
if not p256dh:
return False
auth_secret = base64_urldecode(content['keys']['auth'])
if not auth_secret:
return False
except KeyError:
return False
try:
# Store our data
self.__public_key = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(), p256dh,
)
except ValueError:
# Invalid p256dh key (Can't load Public Key)
return False
self.__endpoint = endpoint
self.__p256dh = content['keys']['p256dh']
self.__auth = content['keys']['auth']
self.__auth_secret = auth_secret
return True
def write(self, path: str, indent: int = 2) -> bool:
"""
Writes content to disk based on path specified. Content is a JSON
file, so ideally you may wish to have `.json' as it's extension for
clarity
"""
if not self.__public_key:
return False
try:
with open(path, 'w', encoding='utf-8') as f:
json.dump(self.dict, f, indent=indent)
except (TypeError, OSError):
# Could not write content
return False
return True
@property
def auth(self) -> Optional[str]:
return self.__auth if self.__public_key else None
@property
def endpoint(self) -> Optional[str]:
return self.__endpoint if self.__public_key else None
@property
def p256dh(self) -> Optional[str]:
return self.__p256dh if self.__public_key else None
@property
def auth_secret(self) -> Optional[bytes]:
return self.__auth_secret if self.__public_key else None
@property
def public_key(self) -> Optional['ec.EllipticCurvePublicKey']:
return self.__public_key
@property
def dict(self) -> dict:
return {
"endpoint": self.__endpoint,
"keys": {
"p256dh": self.__p256dh,
"auth": self.__auth,
},
} if self.__public_key else {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123...',
"keys": {
"p256dh": '<place public key in base64 here>',
"auth": '<place auth in base64 here>',
},
}
def json(self, indent: int = 2) -> str:
"""
Returns JSON representation of the object
"""
return json.dumps(self.dict, indent=indent)
def __bool__(self) -> bool:
"""
handle 'if' statement
"""
return True if self.__public_key else False
def __str__(self) -> str:
"""
Returns our JSON entry as a string
"""
# Return the first 16 characters of the detected endpoint subscription
# id
return '' if not self.__endpoint \
else self.__endpoint.split('/')[-1][:16]
class WebPushSubscriptionManager:
"""
WebPush Subscription Manager
"""
# Format:
# {
# "name1": {
# "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
# "keys": {
# "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
# "auth": "k9Xzm43nBGo=",
# }
# },
# "name2": {
# "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
# "keys": {
# "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
# "auth": "k9Xzm43nBGo=",
# }
# },
# Defines the number of failures we can accept before we abort and assume
# the file is bad
max_load_failure_count = 3
def __init__(self, asset: Optional['AppriseAsset'] = None) -> None:
"""
Webpush Subscription Manager
"""
# Our subscriptions
self.__subscriptions = {}
# Prepare our Asset Object
self.asset = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
def __getitem__(self, key: str) -> WebPushSubscription:
"""
Returns our indexed value if it exists
"""
return self.__subscriptions[key.lower()]
def __setitem__(self, name: str,
subscription: Union[WebPushSubscription, str, dict]
) -> None:
"""
Set's our object if possible
"""
if not self.add(subscription, name=name.lower()):
raise AppriseInvalidData('Invalid subscription provided')
def add(self, subscription: Union[WebPushSubscription, str, dict],
name: Optional[str] = None) -> bool:
"""
Add a subscription into our manager
"""
if not isinstance(subscription, WebPushSubscription):
try:
# Support loading our object
subscription = WebPushSubscription(subscription)
except AppriseInvalidData:
return False
if name is None:
name = str(subscription)
self.__subscriptions[name.lower()] = subscription
return True
def __bool__(self) -> bool:
"""
True is returned if at least one subscription has been loaded.
"""
return True if self.__subscriptions else False
def __len__(self) -> int:
"""
Returns the number of servers loaded; this includes those found within
loaded configuration. This funtion nnever actually counts the
Config entry themselves (if they exist), only what they contain.
"""
return len(self.__subscriptions)
def __iadd__(self, subscription: Union[WebPushSubscription, str, dict]
) -> 'WebPushSubscriptionManager':
if not self.add(subscription):
raise AppriseInvalidData('Invalid subscription provided')
return self
def __contains__(self, key: str) -> bool:
"""
Checks if the key exists
"""
return key.lower() in self.__subscriptions
def clear(self) -> None:
"""
Empties our server list
"""
self.__subscriptions.clear()
@property
def dict(self) -> dict:
"""
Returns a dictionary of all entries
"""
return {k: v.dict for k, v in self.__subscriptions.items()} \
if self.__subscriptions else {}
def load(self, path: str, byte_limit=0) -> bool:
"""
Writes content to disk based on path specified. Content is a JSON
file, so ideally you may wish to have `.json' as it's extension for
clarity
if byte_limit is zero, then we do not limit our file size, otherwise
set this to the bytes you want to restrict yourself by
"""
# Reset our object
self.clear()
# Create our attachment object
attach = AppriseAttachment(asset=self.asset)
# Add our path
attach.add(path)
if byte_limit > 0:
# Enforce maximum file size
attach[0].max_file_size = byte_limit
if not attach.sync():
return False
try:
# Otherwise open our path
with open(attach[0].path, 'r', encoding='utf-8') as f:
content = json.load(f)
except (json.decoder.JSONDecodeError, TypeError, OSError):
# Could not read
return False
if not isinstance(content, dict):
# Not a list of dictionaries
return False
# Verify if we're dealing with a single element:
# {
# "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
# "keys": {
# "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
# "auth": "k9Xzm43nBGo=",
# }
# }
#
# or if we're dealing with a multiple set
#
# {
# "name1": {
# "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
# "keys": {
# "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
# "auth": "k9Xzm43nBGo=",
# }
# },
# "name2": {
# "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
# "keys": {
# "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
# "auth": "k9Xzm43nBGo=",
# }
# },
error_count = 0
if 'endpoint' in content and 'keys' in content:
if not self.add(content):
return False
else:
for name, subscription in content.items():
if not self.add(subscription, name=name.lower()):
error_count += 1
if error_count > self.max_load_failure_count:
self.clear()
return False
return True
def write(self, path: str, indent: int = 2) -> bool:
"""
Writes content to disk based on path specified. Content is a JSON
file, so ideally you may wish to have `.json' as it's extension for
clarity
"""
try:
with open(path, 'w', encoding='utf-8') as f:
json.dump(self.dict, f, indent=indent)
except (TypeError, OSError):
# Could not write content
return False
return True
def json(self, indent: int = 2) -> str:
"""
Returns JSON representation of the object
"""
return json.dumps(self.dict, indent=indent)

View File

@ -32,6 +32,33 @@ import typing
import base64
def base64_urlencode(data: bytes) -> str:
"""
URL Safe Base64 Encoding
"""
try:
return base64.urlsafe_b64encode(data).rstrip(b'=').decode('utf-8')
except TypeError:
# data is not supported; avoid raising exception
return None
def base64_urldecode(data: str) -> bytes:
"""
URL Safe Base64 Encoding
"""
try:
# Normalize base64url string (remove padding, add it back)
padding = '=' * (-len(data) % 4)
return base64.urlsafe_b64decode(data + padding)
except TypeError:
# data is not supported; avoid raising exception
return None
def decode_b64_dict(di: dict) -> dict:
"""
decodes base64 dictionary previously encoded

825
apprise/utils/pem.py Normal file
View File

@ -0,0 +1,825 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os
import json
import base64
import binascii
import struct
from typing import Union, Optional
from ..utils.base64 import base64_urlencode, base64_urldecode
from ..apprise_attachment import AppriseAttachment
from ..asset import AppriseAsset
from ..logger import logger
from ..exception import ApprisePluginException
try:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.ciphers import (
Cipher, algorithms, modes)
from cryptography.exceptions import InvalidTag
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PrivateFormat,
PublicFormat,
)
from cryptography.hazmat.primitives.asymmetric.utils import (
decode_dss_signature
)
# PEM Support enabled
PEM_SUPPORT = True
except ImportError:
# PEM Support disabled
PEM_SUPPORT = False
class ApprisePEMException(ApprisePluginException):
"""
Thrown when there is an error with the PEM Controller
"""
def __init__(self, message, error_code=612):
super().__init__(message, error_code=error_code)
class ApprisePEMController:
"""
PEM Controller Tool for the Apprise Library
"""
# There is no reason a PEM Public Key should exceed 8K in size
# If it is more than this, then it is not accepted
max_pem_public_key_size = 8000
# There is no reason a PEM Private Key should exceed 8K in size
# If it is more than this, then it is not accepted
max_pem_private_key_size = 8000
# Maximum Vapid Message Size
max_webpush_record_size = 4096
def __init__(self, path: str,
pub_keyfile: Optional[str] = None,
prv_keyfile: Optional[str] = None,
name: Optional[str] = None,
asset: Optional[AppriseAsset] = None,
**kwargs) -> None:
"""
Path should be the directory keys can be written and read from such as
<notifyobject>.store.path
Optionally additionally specify a keyfile to explicitly open
"""
# Directory we can work with
self.path = path
# Prepare our Key Placeholders
self.__private_key = None
self.__public_key = None
# Our name (id)
self.name = name.strip(' \t/-+!$@#*').lower() \
if isinstance(name, str) else ''
# Prepare our Asset Object
self.asset = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
# Our temporary reference points
self._prv_keyfile = AppriseAttachment(asset=self.asset)
self._pub_keyfile = AppriseAttachment(asset=self.asset)
if prv_keyfile:
self.load_private_key(prv_keyfile)
elif pub_keyfile:
self.load_public_key(pub_keyfile)
else:
self._pub_keyfile = None
def load_private_key(self, path: Optional[str] = None,
*names: str) -> bool:
"""
Load Private key and from that we can prepare our public key
"""
if path is None:
# Auto-load our content
return True if self.private_keyfile(*names) else False
# Create ourselves an Attachment to work with; this grants us the
# ability to pull this key from a remote site or anything else
# supported by the Attachment object
self._prv_keyfile = AppriseAttachment(asset=self.asset)
# Add our definition to our pem_key reference
self._prv_keyfile.add(path)
# Enforce maximum file size
self._prv_keyfile[0].max_file_size = self.max_pem_private_key_size
#
# Reset Public key
#
self._pub_keyfile = AppriseAttachment(asset=self.asset)
#
# Reset our internal keys
#
self.__private_key = None
self.__public_key = None
if not self._prv_keyfile.sync():
# Early exit
logger.error(
'Could not access PEM Private Key {}.'.format(path))
return False
try:
with open(self._prv_keyfile[0].path, "rb") as f:
self.__private_key = serialization.load_pem_private_key(
f.read(),
password=None, # or provide the password if encrypted
backend=default_backend()
)
except (ValueError, TypeError):
logger.debug(
'PEM Private Key file specified is not supported (%s)',
type(path))
return False
except FileNotFoundError:
logger.debug('PEM Private Key file not found: %s', path)
return False
except OSError as e:
logger.warning('Error accessing PEM Private Key file %s', path)
logger.debug(f'I/O Exception: {e}')
return False
#
# Generate our public key
#
self.__public_key = self.__private_key.public_key()
# Load our private key
return True if self.__private_key else False
def load_public_key(self, path: Optional[str] = None, *names: str) -> bool:
"""
Load Public key only
Note: with just a public key you can only decrypt, encryption is not
possible.
"""
if path is None:
# Auto-load our content
return True if self.public_keyfile(*names) else False
# Create ourselves an Attachment to work with; this grants us the
# ability to pull this key from a remote site or anything else
# supported by the Attachment object
self._pub_keyfile = AppriseAttachment(asset=self.asset)
# Add our definition to our pem_key reference
self._pub_keyfile.add(path)
# Enforce maximum file size
self._pub_keyfile[0].max_file_size = self.max_pem_public_key_size
#
# Reset Private key
#
self._prv_keyfile = AppriseAttachment(asset=self.asset)
#
# Reset our internal keys
#
self.__private_key = None
self.__public_key = None
if not self._pub_keyfile.sync():
# Early exit
logger.error(
'Could not access PEM Public Key {}.'.format(path))
return False
try:
with open(path, 'rb') as key_file:
self.__public_key = serialization.load_pem_public_key(
key_file.read(),
backend=default_backend()
)
except (ValueError, TypeError):
logger.debug(
'PEM Public Key file specified is not supported (%s)',
type(path))
return False
except FileNotFoundError:
# Generate keys
logger.debug('PEM Public Key file not found: %s', path)
return False
except OSError as e:
logger.warning('Error accessing PEM Public Key file %s', path)
logger.debug(f'I/O Exception: {e}')
return False
# Load our private key
return True if self.__public_key else False
def keygen(self, name: 'Optional[str]' = None, force: bool = False):
"""
Generates a set of keys based on name configured.
"""
if not PEM_SUPPORT:
msg = 'PEM Support unavailable; install cryptography library'
logger.warning(msg)
raise ApprisePEMException(msg)
# Detect if a key has been loaded or not
has_key = True if self.private_key(autogen=False) \
or self.public_key(autogen=False) else False
if (has_key and not (name or force)) or not self.path:
logger.trace(
'PEM keygen disabled, reason=%s',
'keyfile-defined' if not has_key
else 'no-write-path')
return False
# Create a new private/public key pair
self.__private_key = ec.generate_private_key(
ec.SECP256R1(), default_backend())
self.__public_key = self.__private_key.public_key()
#
# Prepare our PEM formatted output files
#
private_key = self.__private_key.private_bytes(
Encoding.PEM,
PrivateFormat.PKCS8,
encryption_algorithm=NoEncryption(),
)
public_key = self.__public_key.public_bytes(
encoding=Encoding.PEM,
format=PublicFormat.SubjectPublicKeyInfo,
)
if not name:
name = self.name
file_prefix = '' if not name else f'{name}-'
pub_path = os.path.join(self.path, f'{file_prefix}public_key.pem')
prv_path = os.path.join(self.path, f'{file_prefix}private_key.pem')
if not force:
if os.path.isfile(pub_path):
logger.debug(
'PEM generation skipped; Public Key already exists: %s/%s',
os.path.dirname(pub_path), os.path.basename(pub_path))
return False
if os.path.isfile(prv_path):
logger.debug(
'PEM generation skipped; Private Key already exists: %s%s',
os.path.dirname(prv_path), os.path.basename(prv_path))
return False
try:
# Write our keys to disk
with open(pub_path, 'wb') as f:
f.write(public_key)
except OSError as e:
logger.warning('Error writing Public PEM file %s', pub_path)
logger.debug(f'I/O Exception: {e}')
# Cleanup
try:
os.unlink(pub_path)
logger.trace('Removed %s', pub_path)
except OSError:
pass
return False
try:
with open(prv_path, 'wb') as f:
f.write(private_key)
except OSError as e:
logger.warning('Error writing Private PEM file %s', prv_path)
logger.debug(f'I/O Exception: {e}')
try:
os.unlink(pub_path)
logger.trace('Removed %s', pub_path)
except OSError:
pass
try:
os.unlink(prv_path)
logger.trace('Removed %s', prv_path)
except OSError:
pass
return False
# Update our local file references
self._prv_keyfile = AppriseAttachment(asset=self.asset)
self._prv_keyfile.add(prv_path)
self._pub_keyfile = AppriseAttachment(asset=self.asset)
self._pub_keyfile.add(pub_path)
logger.info(
'Wrote Public/Private PEM key pair for %s/%s',
os.path.dirname(pub_path),
os.path.basename(pub_path))
return True
def public_keyfile(self, *names: str) -> Optional[str]:
"""
Returns the first match of a useable public key based names provided
"""
if not PEM_SUPPORT:
msg = 'PEM Support unavailable; install cryptography library'
logger.warning(msg)
raise ApprisePEMException(msg)
if self._pub_keyfile:
# If our code reaches here, then we fetch our public key
pem_key = self._pub_keyfile[0]
if not pem_key:
# We could not access the attachment
logger.error(
'Could not access PEM Public Key {}.'.format(
pem_key.url(privacy=True)))
return False
return pem_key.path
elif not self.path:
# No path
return None
fnames = [
'public_key.pem',
'public.pem',
'pub.pem',
]
if self.name:
# Include our name in the list
fnames = [self.name] + [*names]
for name in names:
fnames.insert(0, f'{name}-public_key.pem')
_entry = name.lower()
fnames.insert(0, f'{_entry}-public_key.pem')
return next(
(os.path.join(self.path, fname)
for fname in fnames
if os.path.isfile(os.path.join(self.path, fname))),
None)
def private_keyfile(self, *names: str) -> Optional[str]:
"""
Returns the first match of a useable private key based names provided
"""
if not PEM_SUPPORT:
msg = 'PEM Support unavailable; install cryptography library'
logger.warning(msg)
raise ApprisePEMException(msg)
if self._prv_keyfile:
# If our code reaches here, then we fetch our private key
pem_key = self._prv_keyfile[0]
if not pem_key:
# We could not access the attachment
logger.error(
'Could not access PEM Private Key {}.'.format(
pem_key.url(privacy=True)))
return False
return pem_key.path
elif not self.path:
# No path
return None
fnames = [
'private_key.pem',
'private.pem',
'prv.pem',
]
if self.name:
# Include our name in the list
fnames = [self.name] + [*names]
for name in names:
fnames.insert(0, f'{name}-private_key.pem')
_entry = name.lower()
fnames.insert(0, f'{_entry}-private_key.pem')
return next(
(os.path.join(self.path, fname)
for fname in fnames
if os.path.isfile(os.path.join(self.path, fname))),
None)
def public_key(self, *names: str, autogen: Optional[bool] = None,
autodetect: bool = True
) -> Optional['ec.EllipticCurvePublicKey']:
"""
Opens a spcified pem public file and returns the key from it which
is used to decrypt the message
"""
if self.__public_key or not autodetect:
return self.__public_key
path = self.public_keyfile(*names)
if not path:
if (autogen if autogen is not None else self.asset.pem_autogen) \
and self.keygen(*names):
path = self.public_keyfile(*names)
if path:
# We should get a hit now
return self.public_key(autogen=False)
logger.warning('No PEM Public Key could be loaded')
return None
return self.__public_key if (
self.load_public_key(path) or
# Try to see if we can load a private key (which we can generate a
# public from)
self.private_key(*names, autogen=autogen)) else None
def private_key(self, *names: str, autogen: Optional[bool] = None,
autodetect: bool = True
) -> Optional['ec.EllipticCurvePrivateKey']:
"""
Opens a spcified pem private file and returns the key from it which
is used to encrypt the message
"""
if self.__private_key or not autodetect:
return self.__private_key
path = self.private_keyfile(*names)
if not path:
if (autogen if autogen is not None else self.asset.pem_autogen) \
and self.keygen(*names):
path = self.private_keyfile(*names)
if path:
# We should get a hit now
return self.private_key(autogen=False)
logger.warning('No PEM Private Key could be loaded')
return None
return self.__private_key if self.load_private_key(path) else None
def encrypt_webpush(self, message: Union[str, bytes],
public_key: 'ec.EllipticCurvePublicKey',
auth_secret: bytes) -> bytes:
"""
Encrypt a WebPush message using the recipient's public key and auth
secret.
Accepts input message as str or bytes.
"""
if isinstance(message, str):
message = message.encode('utf-8')
# 1. Generate ephemeral EC private/Public key
ephemeral_private_key = \
ec.generate_private_key(ec.SECP256R1(), default_backend())
ephemeral_public_key = ephemeral_private_key.public_key().public_bytes(
encoding=Encoding.X962, format=PublicFormat.UncompressedPoint)
# 2. Random salt
salt = os.urandom(16)
# 3. Generate shared secret via ECDH
shared_secret = ephemeral_private_key.exchange(ec.ECDH(), public_key)
# 4. Derive PRK using HKDF (first phase)
recipient_public_key_bytes = public_key.public_bytes(
encoding=Encoding.X962,
format=PublicFormat.UncompressedPoint,
)
# 5. Derive Encryption key
hkdf_secret = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=auth_secret,
info=b"WebPush: info\x00" +
recipient_public_key_bytes + ephemeral_public_key,
backend=default_backend(),
).derive(shared_secret)
# 6. Derive Content Encryption Key
hkdf_key = HKDF(
algorithm=hashes.SHA256(),
length=16,
salt=salt,
info=b"Content-Encoding: aes128gcm\x00",
backend=default_backend(),
).derive(hkdf_secret)
# 7. Derive Nonce
hkdf_nonce = HKDF(
algorithm=hashes.SHA256(),
length=12,
salt=salt,
info=b"Content-Encoding: nonce\x00",
backend=default_backend(),
).derive(hkdf_secret)
# 8. Encrypt the message
aesgcm = AESGCM(hkdf_key)
# RFC8291 requires us to add '\0x02' byte to end of message
ciphertext = aesgcm.encrypt(
hkdf_nonce, message + b"\x02", associated_data=None)
# 9. Build WebPush header + payload
header = salt
header += struct.pack("!L", self.max_webpush_record_size)
header += struct.pack("!B", len(ephemeral_public_key))
header += ephemeral_public_key
header += ciphertext
return header
def encrypt(self,
message: Union[str, bytes],
public_key: 'Optional[ec.EllipticCurvePublicKey]' = None,
salt: Optional[bytes] = None) -> Optional[str]:
"""
Encrypts a message using the recipient's public key (or self public
key if none provided). Message can be str or bytes.
"""
if not PEM_SUPPORT:
msg = 'PEM Support unavailable; install cryptography library'
logger.warning(msg)
raise ApprisePEMException(msg)
# 1. Handle string vs bytes input
if isinstance(message, str):
message = message.encode('utf-8')
# 2. Select public key
if public_key is None:
public_key = self.public_key()
if public_key is None:
logger.debug("No public key available for encryption.")
return None
# 3. Generate ephemeral EC private key
ephemeral_private_key = ec.generate_private_key(ec.SECP256R1())
# 4. Derive shared secret
shared_secret = ephemeral_private_key.exchange(ec.ECDH(), public_key)
# 5. Derive symmetric AES key using HKDF
derived_key = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=salt, # Allow salt=None if not provided
info=b'ecies-encryption',
backend=default_backend()
).derive(shared_secret)
# 6. Encrypt the message using AES-GCM
iv = os.urandom(12) # 96-bit random IV for GCM
encryptor = Cipher(
algorithms.AES(derived_key),
modes.GCM(iv),
backend=default_backend()
).encryptor()
ciphertext = encryptor.update(message) + encryptor.finalize()
tag = encryptor.tag
# 7. Serialize ephemeral public key as X9.62 Uncompressed Point
ephemeral_public_key_bytes = \
ephemeral_private_key.public_key().public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint,
)
# 8. Combine everything cleanly
full_payload = {
"ephemeral_pubkey": base64_urlencode(ephemeral_public_key_bytes),
"iv": base64_urlencode(iv),
"tag": base64_urlencode(tag),
"ciphertext": base64_urlencode(ciphertext),
}
return base64.b64encode(
json.dumps(full_payload).encode('utf-8')
).decode('utf-8')
def decrypt(self,
encrypted_payload: Union[str, bytes],
private_key: 'Optional[ec.EllipticCurvePrivateKey]' = None,
salt: Optional[bytes] = None) -> Optional[str]:
"""
Decrypts a message using the provided private key or fallback to
self's private key.
Payload is the base64-encoded JSON from encrypt().
"""
if not PEM_SUPPORT:
msg = 'PEM Support unavailable; install cryptography library'
logger.warning(msg)
raise ApprisePEMException(msg)
# 1. Parse input
try:
if isinstance(encrypted_payload, str):
payload_bytes = base64.b64decode(
encrypted_payload.encode('utf-8'))
else:
payload_bytes = base64.b64decode(encrypted_payload)
except binascii.Error:
# Bad Padding
logger.debug("Unparseable encrypted content provided")
return None
try:
payload = json.loads(payload_bytes.decode('utf-8'))
except UnicodeDecodeError:
logger.debug("Unparseable encrypted content provided")
return None
ephemeral_pubkey_bytes = base64_urldecode(payload["ephemeral_pubkey"])
iv = base64_urldecode(payload["iv"])
tag = base64_urldecode(payload["tag"])
ciphertext = base64_urldecode(payload["ciphertext"])
# 2. Select private key
if private_key is None:
private_key = self.private_key()
if private_key is None:
logger.debug("No private key available for decryption.")
return None
# 3. Load ephemeral public key from sender
ephemeral_pubkey = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(),
ephemeral_pubkey_bytes
)
# 4. ECDH shared secret
shared_secret = private_key.exchange(ec.ECDH(), ephemeral_pubkey)
# 5. Derive symmetric AES key with HKDF
derived_key = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
info=b'ecies-encryption',
).derive(shared_secret)
# 6. Decrypt using AES-GCM
decryptor = Cipher(
algorithms.AES(derived_key),
modes.GCM(iv, tag),
).decryptor()
try:
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
except InvalidTag:
logger.debug("Decryption failed - Authentication Mismatch")
# Reason for Error:
# - Mismatched or missing salt
# - Mismatched iv, tag, or ciphertext
# - Incorrect or corrupted ephemeral_pubkey
# - Wrong or incomplete key derivation
# - Data being altered between encryption and decryption
# (truncated/corrupted)
# Basically if we get here, we tried to decrypt encrypted content
# using the wrong key.
return None
# 7. Return decoded message
return plaintext.decode('utf-8')
def sign(self, content: bytes) -> Optional[bytes]:
"""
Sign the message using ES256 (ECDSA w/ SHA256) via private key
"""
try:
# Sign the message using ES256 (ECDSA w/ SHA256)
der_sig = self.private_key()\
.sign(content, ec.ECDSA(hashes.SHA256()))
except AttributeError:
# NoneType; could not load key
return None
# Convert DER to raw R||S
r, s = decode_dss_signature(der_sig)
return r.to_bytes(
32, byteorder='big') + s.to_bytes(32, byteorder='big')
@property
def pub_keyfile(self) -> Optional[Union[str, bool]]:
"""
Returns the Public Keyfile Path if set otherwise it returns None
This property returns False if a keyfile was provided, but was invalid
"""
return None if not self._pub_keyfile \
else (False if not self._pub_keyfile[0]
else self._pub_keyfile[0].path)
@property
def prv_keyfile(self) -> Optional[Union[str, bool]]:
"""
Returns the Private Keyfile Path if set otherwise it returns None
This property returns False if a keyfile was provided, but was invalid
"""
return None if not self._prv_keyfile \
else (False if not self._prv_keyfile[0]
else self._prv_keyfile[0].path)
@property
def x962_str(self) -> str:
"""
X962 serialization based on public key
"""
try:
return base64_urlencode(
self.public_key().public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint)
)
except AttributeError:
# Public Key could not be generated (public_key() returned None)
return ''
def __bool__(self) -> bool:
"""
Returns True if at least 1 key was loaded
"""
return True if (self.private_key() or self.public_key()) else False

View File

@ -43,7 +43,7 @@ Africas Talking, Apprise API, APRS, AWS SES, AWS SNS, Bark, BlueSky, Burst SMS,
BulkSMS, BulkVS, Chanify, Clickatell, ClickSend, DAPNET, DingTalk, Discord, E-Mail, Emby,
FCM, Feishu, Flock, Free Mobile, Google Chat, Gotify, Growl, Guilded, Home
Assistant, httpSMS, IFTTT, Join, Kavenegar, KODI, Kumulos, LaMetric, Line,
LunaSea, MacOSX, Mailgun, Mastodon, Mattermost,Matrix, MessageBird, Microsoft
LunaSea, MacOSX, Mailgun, Mastodon, Mattermost, Matrix, MessageBird, Microsoft
Windows, Microsoft Teams, Misskey, MQTT, MSG91, MyAndroid, Nexmo, Nextcloud,
NextcloudTalk, Notica, Notifiarr, Notifico, ntfy, Office365, OneSignal,
Opsgenie, PagerDuty, PagerTree, ParsePlatform, Plivo, PopcornNotify, Prowl,
@ -51,8 +51,8 @@ Pushalot, PushBullet, Pushjet, PushMe, Pushover, PushSafer, Pushy, PushDeer,
Revolt, Reddit, Resend, Rocket.Chat, RSyslog, SendGrid, ServerChan, Seven, SFR,
Signal, SimplePush, Sinch, Slack, SMSEagle, SMS Manager, SMTP2Go, SparkPost,
Splunk, Super Toasty, Streamlabs, Stride, Synology Chat, Syslog, Techulus Push,
Telegram, Threema Gateway, Twilio, Twitter, Twist, VictorOps, Voipms, Vonage,
WeCom Bot, WhatsApp, Webex Teams, Workflows, WxPusher, XBMC}
Telegram, Threema Gateway, Twilio, Twitter, Twist, Vapid, VictorOps, Voipms,
Vonage, WebPush, WeCom Bot, WhatsApp, Webex Teams, Workflows, WxPusher, XBMC}
Name: python-%{pypi_name}
Version: 1.9.3

View File

@ -2752,6 +2752,30 @@ def test_cwe312_url():
'#random') == 'slack://test@B...4/J...M/X...3/'
def test_base64_encode_decode():
"""
Utils:Base64:URLEncode & Decode
"""
assert utils.base64.base64_urlencode(None) is None
assert utils.base64.base64_urlencode(42) is None
assert utils.base64.base64_urlencode(object) is None
assert utils.base64.base64_urlencode({}) is None
assert utils.base64.base64_urlencode("") is None
assert utils.base64.base64_urlencode("abc") is None
assert utils.base64.base64_urlencode(b"") == ''
assert utils.base64.base64_urlencode(b"abc") == 'YWJj'
assert utils.base64.base64_urldecode(None) is None
assert utils.base64.base64_urldecode(42) is None
assert utils.base64.base64_urldecode(object) is None
assert utils.base64.base64_urldecode({}) is None
assert utils.base64.base64_urldecode("abc") == b'i\xb7'
assert utils.base64.base64_urldecode("") == b''
assert utils.base64.base64_urldecode('YWJj') == b'abc'
def test_dict_base64_codec(tmpdir):
"""
Test encoding/decoding of base64 content

View File

@ -248,6 +248,12 @@ def test_attach_file():
# Test hosted configuration and that we can't add a valid file
aa = AppriseAttachment(location=ContentLocation.HOSTED)
# No entries defined yet
assert bool(aa) is False
assert aa.sync() is False
# Entry count does not impact sync if told to act that way
assert aa.sync(abort_if_empty=False) is True
assert aa.add(path) is False
response = AppriseAttachment.instantiate(path)

697
test/test_plugin_vapid.py Normal file
View File

@ -0,0 +1,697 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os
import sys
import json
import requests
import pytest
from unittest import mock
from apprise.plugins.vapid.subscription import (
WebPushSubscription, WebPushSubscriptionManager)
from apprise.plugins.vapid import NotifyVapid
from apprise import exception, asset, url
from apprise.common import PersistentStoreMode
from apprise.utils.pem import ApprisePEMController
from helpers import AppriseURLTester
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
# Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
# a test UUID we can use
SUBSCRIBER = 'user@example.com'
PLUGIN_ID = 'vapid'
# Our Testing URLs
apprise_url_tests = (
('vapid://', {
'instance': TypeError,
}),
('vapid://:@/', {
'instance': TypeError,
}),
('vapid://invalid-subscriber', {
# An invalid Subscriber
'instance': TypeError,
}),
('vapid://user@example.com', {
# bare bone requirements met, but we don't have our subscription file
# or our private key (pem)
'instance': NotifyVapid,
# We'll fail to respond because we would not have found any
# configuration to load
'notify_response': False,
}),
('vapid://user@example.com?keyfile=invalid&subfile=invalid', {
# Test passing keyfile and subfile on our path (even if invalid)
'instance': NotifyVapid,
# We'll fail to respond because we would not have found any
# configuration to load
'notify_response': False,
}),
('vapid://user@example.com/newuser@example.com', {
# we don't have our subscription file or private key
'instance': NotifyVapid,
'notify_response': False,
}),
('vapid://user@example.ca/newuser@example.ca', {
'instance': NotifyVapid,
# force a failure
'response': False,
'requests_response_code': requests.codes.internal_server_error,
}),
('vapid://user@example.uk/newuser@example.uk', {
'instance': NotifyVapid,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('vapid://user@example.au/newuser@example.au', {
'instance': NotifyVapid,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
)
@pytest.fixture
def patch_persistent_store_namespace(tmpdir):
"""
Force an easy to test environment
"""
with mock.patch.object(url.URLBase, 'url_id', return_value=PLUGIN_ID), \
mock.patch.object(
asset.AppriseAsset, 'storage_mode',
PersistentStoreMode.AUTO), \
mock.patch.object(
asset.AppriseAsset, 'storage_path', str(tmpdir)):
tmp_dir = tmpdir.mkdir(PLUGIN_ID)
# Return the directory name
yield str(tmp_dir)
@pytest.fixture
def subscription_reference():
return {
"user@example.com": {
"endpoint": 'https://fcm.googleapis.com/fcm/send/default',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
},
"user1": {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
},
"user2": {
"endpoint": 'https://fcm.googleapis.com/fcm/send/def456',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
},
}
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_vapid_urls():
"""
NotifyVapid() Apprise URLs - No Config
"""
# Run our general tests
AppriseURLTester(tests=apprise_url_tests).run_all()
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_vapid_urls_with_required_assets(
patch_persistent_store_namespace, subscription_reference):
"""
NotifyVapid() Apprise URLs With Config
"""
# Determine our store
pc = ApprisePEMController(path=patch_persistent_store_namespace)
assert pc.keygen() is True
# Write our subscriptions file to disk
subscription_file = os.path.join(
patch_persistent_store_namespace,
NotifyVapid.vapid_subscription_file)
with open(subscription_file, 'w') as f:
f.write(json.dumps(subscription_reference))
tests = (
('vapid://user@example.com', {
# user@example.com loaded (also used as subscriber id)
'instance': NotifyVapid,
}),
('vapid://user@example.com/newuser@example.com', {
# no newuser@example.com key entry
'instance': NotifyVapid,
'notify_response': False,
}),
('vapid://user@example.com/user1?to=user2', {
# We'll succesfully notify 2 users
'instance': NotifyVapid,
}),
('vapid://user1?to=user2&from=user@example.com', {
# We'll succesfully notify 2 users
'instance': NotifyVapid,
}),
('vapid://?to=user2&from=user@example.com', {
# No host provided
'instance': NotifyVapid,
}),
('vapid://user@example.com?to=user2&from=user@example.com', {
# We'll succesfully notify 2 users
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&ttl=15', {
# test ttl
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&ttl=', {
# test ttl
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&ttl=invalid', {
# test ttl
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&ttl=-4000', {
# bad ttl
'instance': TypeError,
}),
('vapid://user@example.com/user1?to=user2&mode=edge', {
# test mode
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&mode=', {
# test mode
'instance': TypeError,
}),
('vapid://user@example.com/user1?to=user2&mode=invalid', {
# test mode more
'instance': TypeError,
}),
('vapid://user@example.com/user1', {
'instance': NotifyVapid,
# force a failure
'response': False,
'requests_response_code': requests.codes.internal_server_error,
}),
('vapid://user@example.com/user1', {
'instance': NotifyVapid,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('vapid://user@example.com/user1', {
'instance': NotifyVapid,
# Throws a series of connection and transfer exceptions
# when this flag is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
)
AppriseURLTester(tests=tests).run_all()
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_vapid_subscriptions(tmpdir):
"""
NotifyVapid() Subscriptions
"""
# Temporary directory
tmpdir0 = tmpdir.mkdir('tmp00')
with pytest.raises(exception.AppriseInvalidData):
# Integer not supported
WebPushSubscription(42)
with pytest.raises(exception.AppriseInvalidData):
# Not the correct format
WebPushSubscription('bad-content')
with pytest.raises(exception.AppriseInvalidData):
# Invalid JSON
WebPushSubscription('{')
with pytest.raises(exception.AppriseInvalidData):
# Empty Dictionary
WebPushSubscription({})
with pytest.raises(exception.AppriseInvalidData):
WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR=',
"auth": 42,
},
})
with pytest.raises(exception.AppriseInvalidData):
WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 42,
"auth": 'k9Xzm43nBGo=',
},
})
with pytest.raises(exception.AppriseInvalidData):
WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
})
with pytest.raises(exception.AppriseInvalidData):
WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {},
})
with pytest.raises(exception.AppriseInvalidData):
# Invalid p256dh public key provided
wps = WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR=',
"auth": 'k9Xzm43nBGo=',
},
})
# An empty object
wps = WebPushSubscription()
assert bool(wps) is False
assert isinstance(wps.json(), str)
assert json.loads(wps.json())
assert str(wps) == ''
assert wps.auth is None
assert wps.endpoint is None
assert wps.p256dh is None
assert wps.public_key is None
# We can't write anything as there is nothing loaded
assert wps.write(os.path.join(str(tmpdir0), 'subscriptions.json')) is False
# A valid key
wps = WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
})
assert bool(wps) is True
assert isinstance(wps.json(), str)
assert json.loads(wps.json())
assert str(wps) == 'abc123'
assert wps.auth == 'k9Xzm43nBGo='
assert wps.endpoint == 'https://fcm.googleapis.com/fcm/send/abc123'
assert wps.p256dh == 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' \
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0'
assert wps.public_key is not None
# Currently no files here
assert os.listdir(str(tmpdir0)) == []
# Bad content
assert wps.write(object) is False
assert wps.write(None) is False
# Can't write to a name already taken by as a directory
assert wps.write(str(tmpdir0)) is False
# Can't write to a name already taken by as a directory
assert wps.write(os.path.join(str(tmpdir0), 'subscriptions.json')) is True
assert os.listdir(str(tmpdir0)) == ['subscriptions.json']
@pytest.mark.skipif(
'cryptography' in sys.modules,
reason="Requires that cryptography NOT be installed")
def test_plugin_vapid_subscriptions_without_c():
"""
NotifyVapid() Subscriptions (no Cryptography)
"""
with pytest.raises(exception.AppriseInvalidData):
# A valid key that can't be loaded because crytography is missing
WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
})
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_vapid_subscription_manager(tmpdir):
"""
NotifyVapid() Subscription Manager
"""
# Temporary directory
tmpdir0 = tmpdir.mkdir('tmp00')
with pytest.raises(exception.AppriseInvalidData):
# An invalid object
smgr = WebPushSubscriptionManager()
smgr['abc'] = 'invalid'
with pytest.raises(exception.AppriseInvalidData):
# An invalid object
smgr = WebPushSubscriptionManager()
smgr += 'invalid'
smgr = WebPushSubscriptionManager()
assert bool(smgr) is False
assert len(smgr) == 0
sub = {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
}
assert smgr.add(sub) is True
assert bool(smgr) is True
assert len(smgr) == 1
# Same sub (overwrites same slot)
smgr += sub
assert bool(smgr) is True
assert len(smgr) == 1
# This makes a copy
smgr['abc'] = smgr['abc123']
assert bool(smgr) is True
assert len(smgr) == 2
assert isinstance(smgr['abc123'], WebPushSubscription)
# Currently no files here
assert os.listdir(str(tmpdir0)) == []
# Write our content
assert smgr.write(
os.path.join(str(tmpdir0), 'subscriptions.json')) is True
assert os.listdir(str(tmpdir0)) == ['subscriptions.json']
# Reset our object
smgr.clear()
assert bool(smgr) is False
assert len(smgr) == 0
# Load our content back
assert smgr.load(
os.path.join(str(tmpdir0), 'subscriptions.json')) is True
assert bool(smgr) is True
assert len(smgr) == 2
# Write over our file using the standard Subscription format
assert smgr['abc123'].write(
os.path.join(str(tmpdir0), 'subscriptions.json')) is True
# We can still open this type as well
assert smgr.load(
os.path.join(str(tmpdir0), 'subscriptions.json')) is True
assert bool(smgr) is True
assert len(smgr) == 1
smgr.clear()
bad_entry = {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'invalid',
"auth": 'garbage',
},
}
subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json')
with open(subscriptions, 'w', encoding='utf-8') as f:
# A bad JSON file
f.write('{')
assert smgr.load(subscriptions) is False
with open(subscriptions, 'w', encoding='utf-8') as f:
# not expected dictionary
f.write('null')
assert smgr.load(subscriptions) is False
subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json')
with open(subscriptions, 'w', encoding='utf-8') as f:
json.dump(bad_entry, f)
assert smgr.load(subscriptions) is False
# Create bad data
bad_data = {
'bad1': bad_entry,
'bad2': bad_entry,
'bad3': bad_entry,
'bad4': bad_entry,
}
subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json')
with open(subscriptions, 'w', encoding='utf-8') as f:
json.dump(bad_data, f)
assert smgr.load(subscriptions) is False
assert smgr.load('invalid-file') is False
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
@mock.patch('requests.post')
def test_plugin_vapid_initializations(mock_post, tmpdir):
"""
NotifyVapid() Initializations
"""
# Assign our mock object our return value
okay_response = requests.Request()
okay_response.status_code = requests.codes.ok
okay_response.content = ""
mock_post.return_value = okay_response
# Temporary directory
tmpdir0 = tmpdir.mkdir('tmp00')
# Write our subfile
smgr = WebPushSubscriptionManager()
sub = {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
}
subfile = os.path.join(str(tmpdir0), 'subscriptions.json')
assert smgr.add(sub) is True
assert smgr.add(smgr['abc123']) is True
assert os.listdir(str(tmpdir0)) == []
with mock.patch('json.dump', side_effect=OSError):
# We will fial to write
assert smgr.write(subfile) is False
assert smgr.write(subfile) is True
assert os.listdir(str(tmpdir0)) == ['subscriptions.json']
assert isinstance(smgr.json(), str)
_asset = asset.AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir0),
# Auto-gen our private/public key pair
pem_autogen=True,
)
# Auto-Key Generation
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], subfile=subfile,
asset=_asset)
assert isinstance(obj, NotifyVapid)
# Our subscription directory + our
# persistent store where our keys were generated
assert len(os.listdir(str(tmpdir0))) == 2
# Second call re-references keys previously generated
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], subfile=subfile,
asset=_asset)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is True
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is True
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], subfile='/a/bad/path',
asset=_asset)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is False
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is False
# Detect our keyfile
cache_dir = [x for x in os.listdir(str(tmpdir0))
if not x.endswith('subscriptions.json')][0]
# Test fixed assignment to our keyfile
keyfile = os.path.join(str(tmpdir0), cache_dir, 'private_key.pem')
assert os.path.exists(keyfile)
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], keyfile=keyfile,
subfile=subfile, asset=_asset)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is True
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is True
# Invalid Keyfile
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], keyfile=subfile,
subfile=subfile, asset=_asset)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is False
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is False
# AutoGen Temporary directory
tmpdir1 = tmpdir.mkdir('tmp01')
_asset2 = asset.AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir1),
# Auto-gen our private/public key pair
pem_autogen=True,
)
assert os.listdir(str(tmpdir1)) == []
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], keyfile=keyfile,
asset=_asset2)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
# We have a temporary subscription file we can use
assert os.listdir(str(tmpdir1)) == ['00088ad3']
# We will have a dud configuration file, but at least it's something
# to help the user with
assert obj.send('test') is False
# Second instance fails as well
assert obj.send('test') is False
# AutoGen Temporary directory
tmpdir2 = tmpdir.mkdir('tmp02')
_asset3 = asset.AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir2),
# Auto-gen our private/public key pair
pem_autogen=True,
)
# Test invalid keyfile
assert os.path.exists(keyfile)
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], keyfile='invalid-file',
subfile=subfile, asset=_asset3)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is False
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is False
@pytest.mark.skipif(
'cryptography' in sys.modules,
reason="Requires that cryptography NOT be installed")
def test_plugin_vapid_initializations_without_c(tmpdir):
"""
NotifyVapid() Initializations without cryptography
"""
# Temporary directory
tmpdir0 = tmpdir.mkdir('tmp00')
# Write our subfile
smgr = WebPushSubscriptionManager()
sub = {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
}
subfile = os.path.join(str(tmpdir0), 'subscriptions.json')
assert smgr.add(sub) is False
_asset = asset.AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir0),
# Auto-gen our private/public key pair
pem_autogen=True,
)
# Auto-Key Generation
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], subfile=subfile,
asset=_asset)
assert isinstance(obj, NotifyVapid)

562
test/test_utils_pem.py Normal file
View File

@ -0,0 +1,562 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import logging
import os
import sys
import pytest
from unittest import mock
from apprise import AppriseAsset
from apprise import PersistentStoreMode
from apprise import utils
# Disable logging for a cleaner testing output
logging.disable(logging.CRITICAL)
# Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_utils_pem_general(tmpdir):
"""
Utils:PEM
"""
# string to manipulate/work with
unencrypted_str = "message"
tmpdir0 = tmpdir.mkdir('tmp00')
# Currently no files here
assert os.listdir(str(tmpdir0)) == []
asset = AppriseAsset(
storage_mode=PersistentStoreMode.MEMORY,
storage_path=str(tmpdir0),
pem_autogen=False,
)
# Create a PEM Controller
pem_c = utils.pem.ApprisePEMController(path=None, asset=asset)
# Nothing to lookup
assert pem_c.public_keyfile() is None
assert pem_c.public_key() is None
assert pem_c.x962_str == ''
assert pem_c.decrypt(b'data') is None
assert pem_c.encrypt(unencrypted_str) is None
# Keys can not be generated in memory mode
assert pem_c.keygen() is False
assert pem_c.sign(b'data') is None
asset = AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir0),
pem_autogen=False,
)
# No new files
assert os.listdir(str(tmpdir0)) == []
# Our asset is now write mode, so we will be able to generate a key
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
# Nothing to lookup
assert pem_c.public_keyfile() is None
assert pem_c.public_key() is None
assert pem_c.x962_str == ''
assert pem_c.encrypt(unencrypted_str) is None
# Generate our keys
assert bool(pem_c) is False
assert pem_c.keygen() is True
assert bool(pem_c) is True
# We have 2 new key files generated
pub_keyfile = os.path.join(str(tmpdir0), 'public_key.pem')
prv_keyfile = os.path.join(str(tmpdir0), 'private_key.pem')
assert os.path.isfile(pub_keyfile)
assert os.path.isfile(prv_keyfile)
assert pem_c.public_keyfile() is not None
assert pem_c.decrypt("garbage") is None
assert pem_c.public_key() is not None
# Keys used later on as ref
pubkey_ref = pem_c.public_key()
prvkey_ref = pem_c.private_key()
assert isinstance(pem_c.x962_str, str)
assert len(pem_c.x962_str) > 20
content = pem_c.encrypt(unencrypted_str)
assert pem_c.decrypt(pem_c.encrypt(unencrypted_str.encode('utf-8'))) \
== pem_c.decrypt(pem_c.encrypt(unencrypted_str))
assert pem_c.decrypt(
pem_c.encrypt(unencrypted_str, public_key=pem_c.public_key())) \
== pem_c.decrypt(pem_c.encrypt(unencrypted_str))
assert pem_c.decrypt(content) == unencrypted_str
assert isinstance(content, str)
assert pem_c.decrypt(content) == unencrypted_str
# support str as well
assert pem_c.decrypt(content) == unencrypted_str
assert pem_c.decrypt(content.encode('utf-8')) == unencrypted_str
# Sign test
assert isinstance(pem_c.sign(content.encode('utf-8')), bytes)
# Web Push handling
webpush_content = pem_c.encrypt_webpush(
unencrypted_str,
public_key=pem_c.public_key(),
auth_secret=b'secret')
assert isinstance(webpush_content, bytes)
webpush_content = pem_c.encrypt_webpush(
unencrypted_str.encode('utf-8'),
public_key=pem_c.public_key(),
auth_secret=b'secret')
assert isinstance(webpush_content, bytes)
# Non Bytes (garbage basically)
with pytest.raises(TypeError):
assert pem_c.decrypt(None) is None
with pytest.raises(TypeError):
assert pem_c.decrypt(5) is None
with pytest.raises(TypeError):
assert pem_c.decrypt(False) is None
with pytest.raises(TypeError):
assert pem_c.decrypt(object) is None
# Test our initialization
pem_c = utils.pem.ApprisePEMController(
path=None,
prv_keyfile='invalid',
asset=asset)
assert pem_c.private_keyfile() is False
assert pem_c.public_keyfile() is None
assert pem_c.prv_keyfile is False
assert pem_c.pub_keyfile is None
assert pem_c.private_key() is None
assert pem_c.public_key() is None
assert pem_c.decrypt(content) is None
pem_c = utils.pem.ApprisePEMController(
path=None,
pub_keyfile='invalid',
asset=asset)
assert pem_c.private_keyfile() is None
assert pem_c.public_keyfile() is False
assert pem_c.prv_keyfile is None
assert pem_c.pub_keyfile is False
assert pem_c.private_key() is None
assert pem_c.public_key() is None
assert pem_c.decrypt(content) is None
pem_c = utils.pem.ApprisePEMController(
path=None,
prv_keyfile=prv_keyfile,
asset=asset)
assert pem_c.private_keyfile() == prv_keyfile
assert pem_c.public_keyfile() is None
assert pem_c.private_key() is not None
assert pem_c.prv_keyfile == prv_keyfile
assert pem_c.pub_keyfile is None
assert pem_c.public_key() is not None
assert pem_c.decrypt(content) == unencrypted_str
pem_c = utils.pem.ApprisePEMController(
path=None,
pub_keyfile=pub_keyfile,
asset=asset)
assert pem_c.private_keyfile() is None
assert pem_c.public_keyfile() == pub_keyfile
assert pem_c.prv_keyfile is None
assert pem_c.pub_keyfile == pub_keyfile
assert pem_c.private_key() is None
assert pem_c.public_key() is not None
assert pem_c.decrypt(content) is None
# Test our path references
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
assert pem_c.load_private_key(path=None) is True
assert pem_c.private_keyfile() == prv_keyfile
assert pem_c.prv_keyfile is None
assert pem_c.pub_keyfile is None
assert pem_c.decrypt(content) == unencrypted_str
# Generate a new key referencing another location
pem_c = utils.pem.ApprisePEMController(
name='keygen-tests',
path=str(tmpdir0), asset=asset)
# generate ourselves some keys
assert pem_c.keygen() is True
keygen_prv_file = pem_c.prv_keyfile
keygen_pub_file = pem_c.pub_keyfile
# Remove 1 (but not both)
os.unlink(keygen_pub_file)
pem_c = utils.pem.ApprisePEMController(
name='keygen-tests',
path=str(tmpdir0), asset=asset)
# Private key was found, so this does not work
assert pem_c.keygen() is False
os.unlink(keygen_prv_file)
pem_c = utils.pem.ApprisePEMController(
name='keygen-tests',
path=str(tmpdir0), asset=asset)
# It works now
assert pem_c.keygen() is True
# Tests public_key generation failure only
with mock.patch('builtins.open', side_effect=OSError()):
assert pem_c.keygen(force=True) is False
with mock.patch('os.unlink', side_effect=OSError()):
assert pem_c.keygen(force=True) is False
with mock.patch('os.unlink', return_value=True):
assert pem_c.keygen(force=True) is False
# Tests private key generation
side_effect = [
mock.mock_open(read_data="file contents").return_value] + \
[OSError() for _ in range(10)]
with mock.patch('builtins.open', side_effect=side_effect):
assert pem_c.keygen(force=True) is False
with mock.patch('builtins.open', side_effect=side_effect):
with mock.patch('os.unlink', side_effect=OSError()):
assert pem_c.keygen(force=True) is False
with mock.patch('builtins.open', side_effect=side_effect):
with mock.patch('os.unlink', return_value=True):
assert pem_c.keygen(force=True) is False
# Generate a new key referencing another location
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
# We can't re-generate keys if ones already exist
assert pem_c.keygen() is False
# the keygen is the big difference here
assert pem_c.keygen(name='test') is True
# under the hood, a key is not regenerated (as one already exists)
assert pem_c.keygen(name='test') is False
# Generate it a second time by force
assert pem_c.keygen(name='test', force=True) is True
assert pem_c.private_keyfile() == os.path.join(
str(tmpdir0), 'test-private_key.pem')
assert pem_c.public_keyfile() == os.path.join(
str(tmpdir0), 'test-public_key.pem')
assert pem_c.private_key() is not None
assert pem_c.public_key() is not None
assert pem_c.prv_keyfile == os.path.join(
str(tmpdir0), 'test-private_key.pem')
assert pem_c.pub_keyfile == os.path.join(
str(tmpdir0), 'test-public_key.pem')
# 'content' was generated using a different key and can not be
# decrypted
assert pem_c.decrypt(content) is None
# Test Decryption files
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
# Calling decrypt triggers underlining code to auto-load
assert pem_c.decrypt(content) == unencrypted_str
# Using a private key by path
assert pem_c.decrypt(
content, private_key=pem_c.private_key()) == unencrypted_str
# Test different edge cases of load_private_key()
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
assert pem_c.load_private_key() is True
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
assert pem_c.load_private_key(path=prv_keyfile) is True
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
with mock.patch('builtins.open', side_effect=TypeError()):
assert pem_c.load_private_key(path=prv_keyfile) is False
with mock.patch('builtins.open', side_effect=OSError()):
assert pem_c.load_private_key(path=prv_keyfile) is False
with mock.patch('builtins.open', side_effect=FileNotFoundError()):
assert pem_c.load_private_key(path=prv_keyfile) is False
# Test different edge cases of load_public_key()
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
assert pem_c.load_public_key() is True
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
assert pem_c.load_public_key(path=pub_keyfile) is True
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
with mock.patch('builtins.open', side_effect=TypeError()):
assert pem_c.load_public_key(path=pub_keyfile) is False
with mock.patch('builtins.open', side_effect=OSError()):
assert pem_c.load_public_key(path=pub_keyfile) is False
with mock.patch('builtins.open', side_effect=FileNotFoundError()):
assert pem_c.load_public_key(path=pub_keyfile) is False
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
assert pem_c.public_keyfile('test1', 'test2') == pub_keyfile
assert pem_c.private_keyfile('test1', 'test2') == prv_keyfile
pem_c = utils.pem.ApprisePEMController(
path=str(tmpdir0), name='pub1', asset=asset)
assert pem_c.public_key(autogen=True)
pem_c = utils.pem.ApprisePEMController(
path=str(tmpdir0), name='pub2', asset=asset)
assert pem_c.private_key(autogen=True)
#
# Auto key generation turned on
#
asset = AppriseAsset(
storage_mode=PersistentStoreMode.MEMORY,
storage_path=str(tmpdir0),
pem_autogen=True,
)
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
assert pem_c.load_public_key(path=pub_keyfile) is True
pem_c = utils.pem.ApprisePEMController(path=None, asset=asset)
assert pem_c.load_public_key(path=pub_keyfile) is True
tmpdir1 = tmpdir.mkdir('tmp01')
# Currently no files here
assert os.listdir(str(tmpdir1)) == []
asset = AppriseAsset(
storage_mode=PersistentStoreMode.MEMORY,
storage_path=str(tmpdir1),
pem_autogen=False,
)
# Auto-Gen is turned off, so weare not successful here
pem_c = utils.pem.ApprisePEMController(path=None, asset=asset)
assert pem_c.public_key() is None
assert pem_c.private_key() is None
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir1), asset=asset)
assert pem_c.public_key() is None
assert pem_c.private_key() is None
asset = AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir1),
pem_autogen=True,
)
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir1), asset=asset)
# Generate ourselves a private key
assert pem_c.public_key() is not None
assert pem_c.private_key() is not None
pub_keyfile = os.path.join(str(tmpdir1), 'public_key.pem')
prv_keyfile = os.path.join(str(tmpdir1), 'private_key.pem')
assert os.path.isfile(pub_keyfile)
assert os.path.isfile(prv_keyfile)
with open(pub_keyfile, 'w') as f:
f.write('garbage')
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir1), asset=asset)
# we can still load our data as the public key is generated
# from the private
assert pem_c.public_key() is not None
assert pem_c.private_key() is not None
tmpdir2 = tmpdir.mkdir('tmp02')
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir2), asset=asset)
pub_keyfile = os.path.join(str(tmpdir2), 'public_key.pem')
prv_keyfile = os.path.join(str(tmpdir2), 'private_key.pem')
assert not os.path.isfile(pub_keyfile)
assert not os.path.isfile(prv_keyfile)
#
# Public Key Edge Case Tests
#
with \
mock.patch.object(
pem_c, 'public_keyfile', side_effect=[None, pub_keyfile]) \
as mock_keyfile, \
mock.patch.object(
pem_c, 'keygen', side_effect=lambda *_, **__: setattr(
pem_c, '_ApprisePEMController__public_key',
pubkey_ref) or True) as mock_keygen, \
mock.patch.object(
pem_c, 'load_public_key', return_value=True):
result = pem_c.public_key()
assert result is pubkey_ref
assert mock_keyfile.call_count == 2
mock_keygen.assert_called_once()
# - First call: None → triggers keygen
# - Second call (recursive): None → causes fallback
public_keyfile_side_effect = [None, None]
with mock.patch.object(
pem_c, 'public_keyfile', side_effect=public_keyfile_side_effect) \
as mock_keyfile, \
mock.patch.object(pem_c, 'keygen', return_value=True) \
as mock_keygen, \
mock.patch.object(pem_c, 'load_public_key', return_value=False) \
as mock_load:
# Ensure no key is preset initially
setattr(pem_c, '_ApprisePEMController__public_key', None)
result = pem_c.public_key()
assert result is None
# Once in outer call, once in recursive
assert mock_keyfile.call_count == 2
mock_keygen.assert_called_once()
mock_load.assert_not_called()
#
# Private Key Edge Case Tests
#
with \
mock.patch.object(
pem_c, 'private_keyfile', side_effect=[None, prv_keyfile]) \
as mock_keyfile, \
mock.patch.object(
pem_c, 'keygen', side_effect=lambda *_, **__: setattr(
pem_c, '_ApprisePEMController__private_key',
prvkey_ref) or True) as mock_keygen, \
mock.patch.object(
pem_c, 'load_private_key', return_value=True):
result = pem_c.private_key()
assert result is prvkey_ref
assert mock_keyfile.call_count == 2
mock_keygen.assert_called_once()
# - First call: None → triggers keygen
# - Second call (recursive): None → causes fallback
private_keyfile_side_effect = [None, None]
with mock.patch.object(
pem_c, 'private_keyfile',
side_effect=private_keyfile_side_effect) as mock_keyfile, \
mock.patch.object(pem_c, 'keygen', return_value=True) \
as mock_keygen, \
mock.patch.object(pem_c, 'load_private_key', return_value=False) \
as mock_load:
# Ensure no key is preset initially
setattr(pem_c, '_ApprisePEMController__private_key', None)
result = pem_c.private_key()
assert result is None
# Once in outer call, once in recursive
assert mock_keyfile.call_count == 2
mock_keygen.assert_called_once()
mock_load.assert_not_called()
@pytest.mark.skipif(
'cryptography' in sys.modules,
reason="Requires that cryptography NOT be installed")
def test_utils_pem_general_without_c(tmpdir):
"""
Utils:PEM Without cryptography
"""
tmpdir0 = tmpdir.mkdir('tmp00')
# Currently no files here
assert os.listdir(str(tmpdir0)) == []
asset = AppriseAsset(
storage_mode=PersistentStoreMode.MEMORY,
storage_path=str(tmpdir0),
pem_autogen=False,
)
# Create a PEM Controller
pem_c = utils.pem.ApprisePEMController(path=None, asset=asset)
# cryptography library missing poses issues with library useage
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.public_keyfile()
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.public_key()
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.x962_str
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.encrypt("message")
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.keygen()
asset = AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir0),
pem_autogen=False,
)
# No new files
assert os.listdir(str(tmpdir0)) == []
# Our asset is now write mode, so we will be able to generate a key
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
# Nothing to lookup
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.private_keyfile()
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.private_key()
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.x962_str
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.encrypt("message")
# Keys can not be generated in memory mode
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.keygen()
# No files loaded
assert os.listdir(str(tmpdir0)) == []
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.public_keyfile()
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.public_key()
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.x962_str
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.encrypt("message")
with pytest.raises(utils.pem.ApprisePEMException):
pem_c.decrypt("abcd==")