Vapid/WebPush Support

pull/1323/head
Chris Caron 2025-04-26 19:29:30 -04:00
parent ce90151051
commit 5c44e48b64
9 changed files with 2292 additions and 0 deletions

View File

@ -149,6 +149,12 @@ class AppriseAsset:
# if Persistent Storage was set to `memory` # if Persistent Storage was set to `memory`
pgp_autogen = True pgp_autogen = True
# Automatically generate our Privacy Enhanced Mail (PEM) keys if one isn't
# present and our environment configuration allows for it.
# For example, a case where the environment wouldn't allow for it would be
# if Persistent Storage was set to `memory`
pem_autogen = True
# For more detail see CWE-312 @ # For more detail see CWE-312 @
# https://cwe.mitre.org/data/definitions/312.html # https://cwe.mitre.org/data/definitions/312.html
# #

View File

@ -53,6 +53,14 @@ class AppriseDiskIOError(AppriseException):
super().__init__(message, error_code=error_code) super().__init__(message, error_code=error_code)
class AppriseInvalidData(AppriseException):
"""
Thrown when bad data was passed into an internal function
"""
def __init__(self, message, error_code=errno.EINVAL):
super().__init__(message, error_code=error_code)
class AppriseFileNotFound(AppriseDiskIOError, FileNotFoundError): class AppriseFileNotFound(AppriseDiskIOError, FileNotFoundError):
""" """
Thrown when a persistent write occured in MEMORY mode Thrown when a persistent write occured in MEMORY mode

View File

@ -0,0 +1,585 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os
import requests
from itertools import chain
from json import dumps
from ..base import NotifyBase
from ...common import NotifyType
from ...common import NotifyImageSize
from ...common import PersistentStoreMode
from ...utils.parse import parse_list, parse_bool, is_email
from ...utils.base64 import base64_urlencode
from ...utils import pem as _pem
from ...locale import gettext_lazy as _
import time
# Default our global support flag
NOTIFY_VAPID_SUPPORT_ENABLED = False
try:
from . import subscription
NOTIFY_VAPID_SUPPORT_ENABLED = True
except ImportError:
# cryptography is the dependency of the .subscription library
pass
class VapidPushMode:
"""
Supported Vapid Push Services
"""
CHROME = 'chrome'
FIREFOX = 'firefox'
EDGE = 'edge'
OPERA = 'opera'
VAPID_API_LOOKUP = {
VapidPushMode.CHROME:
'https://fcm.googleapis.com/fcm/send',
VapidPushMode.FIREFOX:
'https://updates.push.services.mozilla.com/wpush/v1',
VapidPushMode.EDGE:
'https://fcm.googleapis.com/fcm/send', # Edge uses FCM too
VapidPushMode.OPERA:
'https://fcm.googleapis.com/fcm/send', # Opera is Chromium-based
}
VAPID_PUSH_MODES = (
VapidPushMode.CHROME,
VapidPushMode.FIREFOX,
VapidPushMode.EDGE,
VapidPushMode.OPERA,
)
class NotifyVapid(NotifyBase):
"""
A wrapper for WebPush/Vapid notifications
"""
# Set our global enabled flag
enabled = NOTIFY_VAPID_SUPPORT_ENABLED and _pem.PEM_SUPPORT
requirements = {
# Define our required packaging in order to work
'packages_required': 'cryptography'
}
# The default descriptive name associated with the Notification
service_name = 'Vapid Web Push Notifications'
# The services URL
service_url = \
'https://datatracker.ietf.org/doc/html/draft-thomson-webpush-vapid'
# The default protocol
secure_protocol = 'vapid'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_vapid'
# There is no reason we should exceed 5KB when reading in a PEM file.
# If it is more than this, then it is not accepted.
max_vapid_keyfile_size = 5000
# There is no reason we should exceed 5MB when reading in a JSON file.
# If it is more than this, then it is not accepted.
max_vapid_subfile_size = 5242880
# The maximum length of the body
body_maxlen = 1024
# Our default is to no not use persistent storage beyond in-memory
# reference; this allows us to auto-generate our config if needed
storage_mode = PersistentStoreMode.AUTO
# 43200 = 12 hours
vapid_jwt_expiration_sec = 43200
# Subscription file
vapid_subscription_file = 'subscriptions.json'
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_72
# Define object templates
templates = (
'{schema}://{subscriber}',
'{schema}://{subscriber}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'subscriber': {
'name': _('API Key'),
'type': 'string',
'private': True,
'required': True,
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
# Define our template args
template_args = dict(NotifyBase.template_tokens, **{
'mode': {
'name': _('Mode'),
'type': 'choice:string',
'values': VAPID_PUSH_MODES,
'default': VAPID_PUSH_MODES[0],
'map_to': 'mode',
},
# Default Time To Live (defined in seconds)
# 0 (Zero) - message will be delivered only if the device is reacheable
'ttl': {
'name': _('ttl'),
'type': 'int',
'default': 0,
'min': 0,
'max': 60,
},
'to': {
'alias_of': 'targets',
},
'from': {
'alias_of': 'subscriber',
},
'keyfile': {
# A Private Keyfile is required to sign header
'name': _('PEM Private KeyFile'),
'type': 'string',
'private': True,
},
'subfile': {
# A Subscripion File is required to sign header
'name': _('Subscripion File'),
'type': 'string',
'private': True,
},
'image': {
'name': _('Include Image'),
'type': 'bool',
'default': True,
'map_to': 'include_image',
},
})
def __init__(self, subscriber, mode=None, targets=None, keyfile=None,
subfile=None, include_image=None, ttl=None, **kwargs):
"""
Initialize Vapid Messaging
"""
super().__init__(**kwargs)
# Path to our Private Key file
self.keyfile = None
# Path to our subscription.json file
self.subfile = None
#
# Our Targets
#
self.targets = []
self._invalid_targets = []
# default subscriptions
self.subscriptions = {}
self.subscriptions_loaded = False
# Set our Time to Live Flag
if ttl is None:
self.ttl = self.template_args['ttl']['default']
else:
try:
self.ttl = int(ttl)
except (ValueError, TypeError):
# Do nothing
pass
if self.ttl < self.template_args['ttl']['min'] or \
self.ttl > self.template_args['ttl']['max']:
msg = 'The Vapid TTL specified ({}) is out of range.'\
.format(self.ttl)
self.logger.warning(msg)
raise TypeError(msg)
# Place a thumbnail image inline with the message body
self.include_image = \
self.template_args['image']['default'] \
if include_image is None else include_image
result = is_email(subscriber)
if not result:
msg = 'An invalid Vapid Subscriber' \
'({}) was specified.'.format(subscriber)
self.logger.warning(msg)
raise TypeError(msg)
self.subscriber = result['full_email']
# Store our Mode/service
try:
self.mode = \
NotifyVapid.template_args['mode']['default'] \
if mode is None else mode.lower()
if self.mode not in VAPID_PUSH_MODES:
# allow the outer except to handle this common response
raise
except:
# Invalid region specified
msg = 'The Vapid mode specified ({}) is invalid.' \
.format(mode)
self.logger.warning(msg)
raise TypeError(msg)
# Our Private keyfile
self.keyfile = keyfile
# Our Subscription file
self.subfile = subfile
# Prepare our PEM Object
self.pem = _pem.ApprisePEMController(self.store.path, asset=self.asset)
# Create our subscription object
self.subscriptions = subscription.WebPushSubscriptionManager(
asset=self.asset)
if self.subfile is None and \
self.store.mode != PersistentStoreMode.MEMORY and \
self.asset.pem_autogen:
self.subfile = os.path.join(
self.store.path, self.vapid_subscription_file)
if not os.path.exists(self.subfile) and \
self.subscriptions.write(self.subfile):
self.logger.info(
'Vapid auto-generated %s/%s',
os.path.basename(
self.store.path,
self.vapid_subscription_file))
# Acquire our targets for parsing
self.targets = parse_list(targets)
if not self.targets:
# Add ourselves
self.targets.append(self.subscriber)
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Vapid Notification
"""
if not self.pem and not self.pem.load_private_key(self.keyfile):
self.logger.warning(
'Provided Vapid/WebPush (PEM) Private Key file could '
'not be loaded.')
return False
if not self.targets:
# There is no one to notify; we're done
self.logger.warning('There are no Vapid targets to notify')
return False
if not self.subscriptions_loaded and self.subfile:
# Toggle our loaded flag to prevent trying again later
self.subscriptions_loaded = True
if not self.subscriptions.load(
self.subfile, byte_limit=self.max_vapid_subfile_size):
self.logger.warning(
'Provided Vapid/WebPush subscriptions file could not be '
'loaded.')
return False
if not self.subscriptions:
self.logger.warning('Vapid could not load subscriptions')
return False
if not self.pem:
self.logger.warning(
'No Vapid/WebPush (PEM) Private Key file could be loaded.')
return False
# Prepare our notify URL (based on our mode)
notify_url = VAPID_API_LOOKUP[self.mode]
jwt_token = self.jwt_token
if not jwt_token:
self.logger.warning(
'A Vapid JWT Token could not be generated')
return False
headers = {
'User-Agent': self.app_id,
"TTL": str(self.ttl),
"Content-Encoding": "aes128gcm",
"Content-Type": "application/octet-stream",
"Authorization": f"vapid t={jwt_token}, k={self.public_key}",
}
has_error = False
# Create a copy of the targets list
targets = list(self.targets)
while len(targets):
target = targets.pop(0)
if target not in self.subscriptions:
self.logger.warning(
'Dropped Vapid user '
'(%s) specified - not found in subscriptions.json.' %
target,
)
# Save ourselves from doing this again
self._invalid_targets.append(target)
self.targets.remove(target)
has_error = True
continue
# Encrypt our payload
encrypted_payload = self.pem.encrypt_webpush(
body,
public_key=self.subscriptions[target].public_key,
auth_secret=self.subscriptions[target].auth_secret)
self.logger.debug(
'Vapid %s POST URL: %s (cert_verify=%r)',
self.mode, notify_url, self.verify_certificate,
)
self.logger.debug(
'Vapid %s Encrypted Payload: %d byte(s)', self.mode, len(body))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
notify_url,
data=encrypted_payload,
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code not in (
requests.codes.ok, requests.codes.no_content):
# We had a problem
status_str = \
NotifyBase.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send {} Vapid notification: '
'{}{}error={}.'.format(
self.mode,
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n%s', r.content)
has_error = True
else:
self.logger.info('Sent %s Vapid notification.', self.mode)
except requests.RequestException as e:
self.logger.warning(
'A Connection error occurred sending Vapid '
'notification.'
)
self.logger.debug('Socket Exception: %s', str(e))
has_error = True
return not has_error
@property
def url_identifier(self):
"""
Returns all of the identifiers that make this URL unique from
another simliar one. Targets or end points should never be identified
here.
"""
return (self.secure_protocol, self.mode, self.subscriber)
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any URL parameters
params = {
'mode': self.mode,
'ttl': str(self.ttl),
}
if self.keyfile:
# Include our keyfile if specified
params['keyfile'] = self.keyfile
if self.subfile:
# Include our subfile if specified
params['subfile'] = self.subfile
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
targets = self.targets if not (
self.targets == 1 and
self.targets[0].lower() == self.subscriber.lower()) else []
return '{schema}://{subscriber}/{targets}?{params}'.format(
schema=self.secure_protocol,
subscriber=NotifyVapid.quote(self.subscriber, safe='@'),
targets='/'.join(chain(
[str(t) for t in targets],
[NotifyVapid.quote(x, safe='@')
for x in self._invalid_targets])),
params=NotifyVapid.urlencode(params),
)
def __len__(self):
"""
Returns the number of targets associated with this notification
"""
targets = len(self.targets)
return targets if targets else 1
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# Prepare our targets
results['targets'] = []
if 'from' in results['qsd'] and len(results['qsd']['from']):
results['subscriber'] = \
NotifyVapid.unquote(results['qsd']['from'])
if results['user'] and results['host']:
# whatever is left on the URL goes
results['targets'].append('{}@{}'.format(
NotifyVapid.unquote(results['user']),
NotifyVapid.unquote(results['host']),
))
elif results['host']:
results['targets'].append(
NotifyVapid.unquote(results['host']))
else:
# Acquire our subscriber information
results['subscriber'] = '{}@{}'.format(
NotifyVapid.unquote(results['user']),
NotifyVapid.unquote(results['host']),
)
results['targets'].extend(
NotifyVapid.split_path(results['fullpath']))
# Get our mode
results['mode'] = results['qsd'].get('mode')
# Get Image Flag
results['include_image'] = \
parse_bool(results['qsd'].get(
'image', NotifyVapid.template_args['image']['default']))
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyVapid.parse_list(results['qsd']['to'])
# Our Private Keyfile (PEM)
if 'keyfile' in results['qsd'] and results['qsd']['keyfile']:
results['keyfile'] = \
NotifyVapid.unquote(results['qsd']['keyfile'])
# Our Subscription File (JSON)
if 'subfile' in results['qsd'] and results['qsd']['subfile']:
results['subfile'] = \
NotifyVapid.unquote(results['qsd']['subfile'])
# Support the 'ttl' variable
if 'ttl' in results['qsd'] and len(results['qsd']['ttl']):
results['ttl'] = \
NotifyVapid.unquote(results['qsd']['ttl'])
return results
@property
def jwt_token(self):
"""
Returns our VAPID Token based on class details
"""
# JWT header
header = {
"alg": "ES256",
"typ": "JWT"
}
# JWT payload
payload = {
"aud": VAPID_API_LOOKUP[self.mode],
"exp": int(time.time()) + self.vapid_jwt_expiration_sec,
"sub": f"mailto:{self.subscriber}"
}
# Base64 URL encode header and payload
header_b64 = base64_urlencode(
dumps(header, separators=(",", ":")).encode('utf-8'))
payload_b64 = base64_urlencode(
dumps(payload, separators=(",", ":")).encode('utf-8'))
signing_input = f"{header_b64}.{payload_b64}".encode('utf-8')
signature_b64 = base64_urlencode(self.pem.sign(signing_input))
# Return final token
return f"{header_b64}.{payload_b64}.{signature_b64}"
@property
def public_key(self):
"""
Returns our public key representation
"""
return self.pem.x962_str

View File

@ -0,0 +1,414 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import json
from ...asset import AppriseAsset
from ...utils.base64 import base64_urldecode
from ...exception import AppriseInvalidData
from ...apprise_attachment import AppriseAttachment
from cryptography.hazmat.primitives.asymmetric import ec
class WebPushSubscription:
"""
WebPush Subscription
"""
# Format:
# {
# "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
# "keys": {
# "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
# "auth": "k9Xzm43nBGo=",
# }
# }
def __init__(self, content: str | dict | None = None):
"""
Prepares a webpush object provided with content
Content can be a dictionary, or JSON String
"""
# Our variables
self.__endpoint = None
self.__p256dh = None
self.__auth = None
self.__auth_secret = None
self.__public_key = None
if content is not None:
if not self.load(content):
raise AppriseInvalidData('Could not load subscription')
def load(self, content: str | dict | None = None):
"""
Performs the loading/validation of the object
"""
# Reset our variables
self.__endpoint = None
self.__p256dh = None
self.__auth = None
self.__auth_secret = None
self.__public_key = None
if isinstance(content, str):
try:
content = json.loads(content)
except json.decoder.JSONDecodeError:
# Bad data
return False
if not isinstance(content, dict):
# We could not load he result set
return False
# Retreive our contents for validation
endpoint = content.get('endpoint')
if not isinstance(endpoint, str):
return False
try:
p256dh = base64_urldecode(content['keys']['p256dh'])
if not p256dh:
return False
auth_secret = base64_urldecode(content['keys']['auth'])
if not auth_secret:
return False
except KeyError:
return False
try:
# Store our data
self.__public_key = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(), p256dh,
)
except ValueError:
# Invalid p256dh key (Can't load Public Key)
return False
self.__endpoint = endpoint
self.__p256dh = content['keys']['p256dh']
self.__auth = content['keys']['auth']
self.__auth_secret = auth_secret
return True
def write(self, path: str, indent: int = 2) -> bool:
"""
Writes content to disk based on path specified. Content is a JSON
file, so ideally you may wish to have `.json' as it's extension for
clarity
"""
if not self.__public_key:
return False
try:
with open(path, 'w', encoding='utf-8') as f:
json.dump(self.dict, f, indent=indent)
except (TypeError, OSError):
# Could not write content
return False
return True
@property
def auth(self) -> str | None:
return self.__auth if self.__public_key else None
@property
def endpoint(self) -> str | None:
return self.__endpoint if self.__public_key else None
@property
def p256dh(self) -> str | None:
return self.__p256dh if self.__public_key else None
@property
def auth_secret(self) -> bytes | None:
return self.__auth_secret if self.__public_key else None
@property
def public_key(self) -> ec.EllipticCurvePublicKey | None:
return self.__public_key
@property
def dict(self) -> dict:
return {
"endpoint": self.__endpoint,
"keys": {
"p256dh": self.__p256dh,
"auth": self.__auth,
},
} if self.__public_key else {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123...',
"keys": {
"p256dh": '<place public key in base64 here>',
"auth": '<place auth in base64 here>',
},
}
def json(self, indent: int = 2) -> str:
"""
Returns JSON representation of the object
"""
return json.dumps(self.dict, indent=indent)
def __bool__(self) -> bool:
"""
handle 'if' statement
"""
return True if self.__public_key else False
def __str__(self) -> str:
"""
Returns our JSON entry as a string
"""
# Return the first 16 characters of the detected endpoint subscription
# id
return '' if not self.__endpoint \
else self.__endpoint.split('/')[-1][:16]
class WebPushSubscriptionManager:
"""
WebPush Subscription Manager
"""
# Format:
# {
# "name1": {
# "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
# "keys": {
# "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
# "auth": "k9Xzm43nBGo=",
# }
# },
# "name2": {
# "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
# "keys": {
# "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
# "auth": "k9Xzm43nBGo=",
# }
# },
# Defines the number of failures we can accept before we abort and assume
# the file is bad
max_load_failure_count = 3
def __init__(self, asset=None):
"""
Webpush Subscription Manager
"""
# Our subscriptions
self.__subscriptions = {}
# Prepare our Asset Object
self.asset = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
def __getitem__(self, key: str) -> int:
"""
Returns our indexed value if it exists
"""
return self.__subscriptions[key.lower()]
def __setitem__(self, name: str,
subscription: WebPushSubscription | str | dict) -> None:
"""
Set's our object if possible
"""
if not self.add(subscription, name=name.lower()):
raise AppriseInvalidData('Invalid subscription provided')
def add(self,
subscription: WebPushSubscription | str | dict,
name: str | None = None) -> bool:
"""
Add a subscription into our manager
"""
if not isinstance(subscription, WebPushSubscription):
try:
# Support loading our object
subscription = WebPushSubscription(subscription)
except AppriseInvalidData:
return True
if name is None:
name = str(subscription)
self.__subscriptions[name.lower()] = subscription
return True
def __bool__(self) -> bool:
"""
True is returned if at least one subscription has been loaded.
"""
return True if self.__subscriptions else False
def __len__(self) -> int:
"""
Returns the number of servers loaded; this includes those found within
loaded configuration. This funtion nnever actually counts the
Config entry themselves (if they exist), only what they contain.
"""
return len(self.__subscriptions)
def __iadd__(self, subscription: WebPushSubscription |
str | dict) -> "WebPushSubscriptionManager":
if not self.add(subscription):
raise AppriseInvalidData('Invalid subscription provided')
return self
def __contains__(self, key: str) -> bool:
"""
Checks if the key exists
"""
return key.lower() in self.__subscriptions
def clear(self) -> None:
"""
Empties our server list
"""
self.__subscriptions.clear()
@property
def dict(self) -> dict:
"""
Returns a dictionary of all entries
"""
return {k: v.dict for k, v in self.__subscriptions.items()} \
if self.__subscriptions else {}
def load(self, path: str, byte_limit=0) -> bool:
"""
Writes content to disk based on path specified. Content is a JSON
file, so ideally you may wish to have `.json' as it's extension for
clarity
if byte_limit is zero, then we do not limit our file size, otherwise
set this to the bytes you want to restrict yourself by
"""
# Reset our object
self.clear()
# Create our attachment object
attach = AppriseAttachment(asset=self.asset)
# Add our path
attach.add(path)
if byte_limit > 0:
# Enforce maximum file size
attach[0].max_file_size = byte_limit
if not path:
return False
try:
# Otherwise open our path
with open(attach[0].path, 'r', encoding='utf-8') as f:
content = json.load(f)
except (TypeError, OSError):
# Could not read
return False
# Verify if we're dealing with a single element:
# {
# "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
# "keys": {
# "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
# "auth": "k9Xzm43nBGo=",
# }
# }
#
# or if we're dealing with a multiple set
#
# {
# "name1": {
# "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
# "keys": {
# "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
# "auth": "k9Xzm43nBGo=",
# }
# },
# "name2": {
# "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",
# "keys": {
# "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...",
# "auth": "k9Xzm43nBGo=",
# }
# },
error_count = 0
if 'endpoint' in content and 'keys' in content:
if not self.add(content):
return False
else:
for name, subscription in content.items():
if not self.add(subscription, name=name.lower()):
error_count += 1
if error_count > self.max_load_failure_count:
self.clear()
return False
return True
def write(self, path: str, indent: int = 2) -> bool:
"""
Writes content to disk based on path specified. Content is a JSON
file, so ideally you may wish to have `.json' as it's extension for
clarity
"""
try:
with open(path, 'w', encoding='utf-8') as f:
json.dump(self.dict, f, indent=indent)
except (TypeError, OSError):
# Could not write content
return False
return True
def json(self, indent: int = 2) -> str:
"""
Returns JSON representation of the object
"""
return json.dumps(self.dict, indent=indent)

View File

@ -32,6 +32,33 @@ import typing
import base64 import base64
def base64_urlencode(data: bytes) -> str:
"""
URL Safe Base64 Encoding
"""
try:
return base64.urlsafe_b64encode(data).rstrip(b'=').decode('utf-8')
except TypeError:
# data is not supported; avoid raising exception
return None
def base64_urldecode(data: str) -> bytes:
"""
URL Safe Base64 Encoding
"""
try:
# Normalize base64url string (remove padding, add it back)
padding = '=' * (-len(data) % 4)
return base64.urlsafe_b64decode(data + padding)
except TypeError:
# data is not supported; avoid raising exception
return None
def decode_b64_dict(di: dict) -> dict: def decode_b64_dict(di: dict) -> dict:
""" """
decodes base64 dictionary previously encoded decodes base64 dictionary previously encoded

740
apprise/utils/pem.py Normal file
View File

@ -0,0 +1,740 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os
import json
import base64
import struct
from ..utils.base64 import base64_urlencode, base64_urldecode
from ..apprise_attachment import AppriseAttachment
from ..asset import AppriseAsset
from ..logger import logger
from ..exception import ApprisePluginException
try:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.ciphers import (
Cipher, algorithms, modes)
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PrivateFormat,
PublicFormat,
)
from cryptography.hazmat.primitives.asymmetric.utils import (
decode_dss_signature
)
# PEM Support enabled
PEM_SUPPORT = True
except ImportError:
# PEM Support disabled
PEM_SUPPORT = False
class ApprisePEMException(ApprisePluginException):
"""
Thrown when there is an error with the PEM Controller
"""
def __init__(self, message, error_code=612):
super().__init__(message, error_code=error_code)
class ApprisePEMController:
"""
PEM Controller Tool for the Apprise Library
"""
# There is no reason a PEM Public Key should exceed 8K in size
# If it is more than this, then it is not accepted
max_pem_public_key_size = 8000
# There is no reason a PEM Private Key should exceed 8K in size
# If it is more than this, then it is not accepted
max_pem_private_key_size = 8000
# Maximum Vapid Message Size
max_webpush_record_size = 4096
def __init__(self, path, pub_keyfile=None, prv_keyfile=None,
name=None, asset=None, **kwargs):
"""
Path should be the directory keys can be written and read from such as
<notifyobject>.store.path
Optionally additionally specify a keyfile to explicitly open
"""
# Directory we can work with
self.path = path
# Prepare our Key Placeholders
self.__private_key = None
self.__public_key = None
# Our name (id)
self.name = name.strip(' \t/-+!$@#*').lower() \
if isinstance(name, str) else ''
# Prepare our Asset Object
self.asset = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
# Our temporary reference points
self._prv_keyfile = AppriseAttachment(asset=self.asset)
self._pub_keyfile = AppriseAttachment(asset=self.asset)
if prv_keyfile:
self.load_private_key(prv_keyfile)
elif pub_keyfile:
self.load_public_key(pub_keyfile)
else:
self._pub_keyfile = None
def load_private_key(self, path=None, *names):
"""
Load Private key and from that we can prepare our public key
"""
if path is None:
# Auto-load our content
return True if self.private_keyfile(*names) else False
# Create ourselves an Attachment to work with; this grants us the
# ability to pull this key from a remote site or anything else
# supported by the Attachment object
self._prv_keyfile = AppriseAttachment(asset=self.asset)
# Add our definition to our pem_key reference
self._prv_keyfile.add(path)
# Enforce maximum file size
self._prv_keyfile[0].max_file_size = self.max_pem_private_key_size
#
# Reset Public key
#
self._pub_keyfile = AppriseAttachment(asset=self.asset)
#
# Reset our internal keys
#
self.__private_key = None
self.__public_key = None
if not self._prv_keyfile:
# Early exit
logger.error(
'Could not access PEM Private Key {}.'.format(path))
return False
try:
with open(self._prv_keyfile[0].path, "rb") as f:
self.__private_key = serialization.load_pem_private_key(
f.read(),
password=None, # or provide the password if encrypted
backend=default_backend()
)
except FileNotFoundError:
# Generate keys
logger.debug('PEM Private Key file not found: %s', path)
return False
except OSError as e:
logger.warning('Error accessing PEM Private Key file %s', path)
logger.debug(f'I/O Exception: {e}')
return False
#
# Generate our public key
#
self.__public_key = self.__private_key.public_key()
# Load our private key
return True if self.__private_key else False
def load_public_key(self, path=None, *names):
"""
Load Public key only
Note: with just a public key you can only decrypt, encryption is not
possible.
"""
if path is None:
# Auto-load our content
return True if self.public_keyfile(*names) else False
# Create ourselves an Attachment to work with; this grants us the
# ability to pull this key from a remote site or anything else
# supported by the Attachment object
self._pub_keyfile = AppriseAttachment(asset=self.asset)
# Add our definition to our pem_key reference
self._pub_keyfile.add(path)
# Enforce maximum file size
self._pub_keyfile[0].max_file_size = self.max_pem_public_key_size
#
# Reset Private key
#
self._prv_keyfile = AppriseAttachment(asset=self.asset)
#
# Reset our internal keys
#
self.__private_key = None
self.__public_key = None
if not self._pub_keyfile:
# Early exit
logger.error(
'Could not access PEM Public Key {}.'.format(path))
return False
try:
with open(path, 'rb') as key_file:
self.__public_key = serialization.load_pem_public_key(
key_file.read(),
backend=default_backend()
)
except FileNotFoundError:
# Generate keys
logger.debug('PEM Public Key file not found: %s', path)
return None
except OSError as e:
logger.warning('Error accessing PEM Public Key file %s', path)
logger.debug(f'I/O Exception: {e}')
return None
# Load our private key
return True if self.__public_key else False
def keygen(self, name=None, force=False):
"""
Generates a set of keys based on name configured.
"""
if self._pub_keyfile or not self.path:
logger.trace(
'PEM keygen disabled, reason=%s',
'keyfile-defined' if not self._pub_keyfile
else 'no-write-path')
return False
# Create a new private/public key pair
self.__private_key = ec.generate_private_key(
ec.SECP256R1(), default_backend())
self.__public_key = self.__private_key.public_key()
#
# Prepare our PEM formatted output files
#
private_key = self.__private_key.private_bytes(
Encoding.PEM,
PrivateFormat.PKCS8,
encryption_algorithm=NoEncryption(),
)
public_key = self.__public_key.public_bytes(
encoding=Encoding.PEM,
format=PublicFormat.SubjectPublicKeyInfo,
)
if not name:
name = self.name
file_prefix = '' if not name else f'{name}-'
pub_path = os.path.join(self.path, f'{file_prefix}public_key.pem')
prv_path = os.path.join(self.path, f'{file_prefix}private_key.pem')
if os.path.isfile(pub_path) and not force:
logger.debug(
'PEM generation skipped; Public Key already exists: %s',
pub_path)
return True
try:
# Write our keys to disk
with open(pub_path, 'wb') as f:
f.write(public_key)
except OSError as e:
logger.warning('Error writing Public PEM file %s', pub_path)
logger.debug(f'I/O Exception: {e}')
# Cleanup
try:
os.unlink(pub_path)
logger.trace('Removed %s', pub_path)
except OSError:
pass
try:
with open(prv_path, 'wb') as f:
f.write(private_key)
except OSError as e:
logger.warning('Error writing Private PEM file %s', prv_path)
logger.debug(f'I/O Exception: {e}')
try:
os.unlink(pub_path)
logger.trace('Removed %s', pub_path)
except OSError:
pass
try:
os.unlink(prv_path)
logger.trace('Removed %s', prv_path)
except OSError:
pass
return False
logger.info(
'Wrote Public/Private PEM key pair for %s/%s',
os.path.dirname(pub_path),
os.path.basename(pub_path))
return True
def public_keyfile(self, *names):
"""
Returns the first match of a useable public key based names provided
"""
if not PEM_SUPPORT:
msg = 'PEM Support unavailable; install cryptography library'
logger.warning(msg)
raise ApprisePEMException(msg)
if self._pub_keyfile:
# If our code reaches here, then we fetch our public key
pem_key = self._pub_keyfile[0]
if not pem_key:
# We could not access the attachment
logger.error(
'Could not access PEM Public Key {}.'.format(
pem_key.url(privacy=True)))
return False
return pem_key.path
elif not self.path:
# No path
return None
fnames = [
'public_key.pem',
'public.pem',
'pub.pem',
]
if self.name:
# Include our name in the list
fnames = [self.name] + [*names]
for name in names:
fnames.insert(0, f'{name}-public_key.pem')
_entry = name.lower()
fnames.insert(0, f'{_entry}-public_key.pem')
return next(
(os.path.join(self.path, fname)
for fname in fnames
if os.path.isfile(os.path.join(self.path, fname))),
None)
def private_keyfile(self, *names):
"""
Returns the first match of a useable private key based names provided
"""
if not PEM_SUPPORT:
msg = 'PEM Support unavailable; install cryptography library'
logger.warning(msg)
raise ApprisePEMException(msg)
if self._prv_keyfile:
# If our code reaches here, then we fetch our private key
pem_key = self._prv_keyfile[0]
if not pem_key:
# We could not access the attachment
logger.error(
'Could not access PEM Private Key {}.'.format(
pem_key.url(privacy=True)))
return False
return pem_key.path
elif not self.path:
# No path
return None
fnames = [
'private_key.pem',
'private.pem',
'prv.pem',
]
if self.name:
# Include our name in the list
fnames = [self.name] + [*names]
for name in names:
fnames.insert(0, f'{name}-private_key.pem')
_entry = name.lower()
fnames.insert(0, f'{_entry}-private_key.pem')
return next(
(os.path.join(self.path, fname)
for fname in fnames
if os.path.isfile(os.path.join(self.path, fname))),
None)
def public_key(self, *names, autogen=None):
"""
Opens a spcified pem public file and returns the key from it which
is used to decrypt the message
"""
if self.__public_key:
return self.__public_key
path = self.public_keyfile(*names)
if not path:
if (autogen if autogen is not None else self.asset.pem_autogen) \
and self.keygen(*names):
path = self.public_keyfile(*names)
if path:
# We should get a hit now
return self.public_key(*names)
logger.warning('No PEM Public Key could be loaded')
return None
return self.__public_key if (
self.load_public_key(path) or
# Try to see if we can load a private key (which we can generate a
# public from)
self.private_key(names=names, autogen=autogen)) else None
def private_key(self, *names, autogen=None):
"""
Opens a spcified pem private file and returns the key from it which
is used to encrypt the message
"""
if self.__private_key:
return self.__private_key
path = self.private_keyfile(*names)
if not path:
if (autogen if autogen is not None else self.asset.pem_autogen) \
and self.keygen(*names):
path = self.private_keyfile(*names)
if path:
# We should get a hit now
return self.private_key(*names)
logger.warning('No PEM Private Key could be loaded')
return None
return self.__private_key if self.load_private_key(path) else None
def encrypt_webpush(self, message: str | bytes,
# Information required
public_key: ec.EllipticCurvePublicKey,
auth_secret: bytes) -> bytes:
"""
Encrypt a WebPush message using the recipient's public key and auth
secret.
Accepts input message as str or bytes.
"""
if isinstance(message, str):
message = message.encode('utf-8')
# 1. Generate ephemeral EC private/Public key
ephemeral_private_key = \
ec.generate_private_key(ec.SECP256R1(), default_backend())
ephemeral_public_key = ephemeral_private_key.public_key().public_bytes(
encoding=Encoding.X962, format=PublicFormat.UncompressedPoint)
# 2. Random salt
salt = os.urandom(16)
# 3. Generate shared secret via ECDH
shared_secret = ephemeral_private_key.exchange(ec.ECDH(), public_key)
# 4. Derive PRK using HKDF (first phase)
recipient_public_key_bytes = public_key.public_bytes(
encoding=Encoding.X962,
format=PublicFormat.UncompressedPoint,
)
# 5. Derive Encryption key
hkdf_secret = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=auth_secret,
info=b"WebPush: info\x00" +
recipient_public_key_bytes + ephemeral_public_key,
backend=default_backend(),
).derive(shared_secret)
# 6. Derive Content Encryption Key
hkdf_key = HKDF(
algorithm=hashes.SHA256(),
length=16,
salt=salt,
info=b"Content-Encoding: aes128gcm\x00",
backend=default_backend(),
).derive(hkdf_secret)
# 7. Derive Nonce
hkdf_nonce = HKDF(
algorithm=hashes.SHA256(),
length=12,
salt=salt,
info=b"Content-Encoding: nonce\x00",
backend=default_backend(),
).derive(hkdf_secret)
# 8. Encrypt the message
aesgcm = AESGCM(hkdf_key)
# RFC8291 requires us to add '\0x02' byte to end of message
ciphertext = aesgcm.encrypt(
hkdf_nonce, message + b"\x02", associated_data=None)
# 9. Build WebPush header + payload
header = salt
header += struct.pack("!L", self.max_webpush_record_size)
header += struct.pack("!B", len(ephemeral_public_key))
header += ephemeral_public_key
header += ciphertext
return header
def encrypt(self,
message: str | bytes,
public_key: ec.EllipticCurvePublicKey | None = None,
salt: bytes | None = None) -> str | None:
"""
Encrypts a message using the recipient's public key (or self public
key if none provided). Message can be str or bytes.
"""
# 1. Handle string vs bytes input
if isinstance(message, str):
message = message.encode('utf-8')
# 2. Select public key
if public_key is None:
public_key = self.public_key()
if public_key is None:
logger.debug("No public key available for encryption.")
return None
# 3. Generate ephemeral EC private key
ephemeral_private_key = ec.generate_private_key(ec.SECP256R1())
# 4. Derive shared secret
shared_secret = ephemeral_private_key.exchange(ec.ECDH(), public_key)
# 5. Derive symmetric AES key using HKDF
derived_key = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=salt, # Allow salt=None if not provided
info=b'ecies-encryption',
backend=default_backend()
).derive(shared_secret)
# 6. Encrypt the message using AES-GCM
iv = os.urandom(12) # 96-bit random IV for GCM
encryptor = Cipher(
algorithms.AES(derived_key),
modes.GCM(iv),
backend=default_backend()
).encryptor()
ciphertext = encryptor.update(message) + encryptor.finalize()
tag = encryptor.tag
# 7. Serialize ephemeral public key as X9.62 Uncompressed Point
ephemeral_public_key_bytes = \
ephemeral_private_key.public_key().public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint,
)
# 8. Combine everything cleanly
full_payload = {
"ephemeral_pubkey": base64_urlencode(ephemeral_public_key_bytes),
"iv": base64_urlencode(iv),
"tag": base64_urlencode(tag),
"ciphertext": base64_urlencode(ciphertext),
}
return base64.b64encode(
json.dumps(full_payload).encode('utf-8')
).decode('utf-8')
def decrypt(self,
encrypted_payload: str,
private_key: ec.EllipticCurvePrivateKey | None = None,
salt: bytes | None = None) -> str | None:
"""
Decrypts a message using the provided private key or fallback to
self's private key.
Payload is the base64-encoded JSON from encrypt().
"""
# 1. Parse input
if isinstance(encrypted_payload, str):
payload_bytes = base64.b64decode(encrypted_payload.encode('utf-8'))
else:
payload_bytes = base64.b64decode(encrypted_payload)
payload = json.loads(payload_bytes.decode('utf-8'))
ephemeral_pubkey_bytes = base64_urldecode(payload["ephemeral_pubkey"])
iv = base64_urldecode(payload["iv"])
tag = base64_urldecode(payload["tag"])
ciphertext = base64_urldecode(payload["ciphertext"])
# 2. Select private key
if private_key is None:
private_key = self.private_key()
if private_key is None:
logger.debug("No private key available for decryption.")
return None
# 3. Load ephemeral public key from sender
ephemeral_pubkey = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(),
ephemeral_pubkey_bytes
)
# 4. ECDH shared secret
shared_secret = private_key.exchange(ec.ECDH(), ephemeral_pubkey)
# 5. Derive symmetric AES key with HKDF
derived_key = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
info=b'ecies-encryption',
).derive(shared_secret)
# 6. Decrypt using AES-GCM
decryptor = Cipher(
algorithms.AES(derived_key),
modes.GCM(iv, tag),
).decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
# 7. Return decoded message
return plaintext.decode('utf-8')
def sign(self, content):
"""
Sign the message using ES256 (ECDSA w/ SHA256) via private key
"""
try:
# Sign the message using ES256 (ECDSA w/ SHA256)
der_sig = self.private_key()\
.sign(content, ec.ECDSA(hashes.SHA256()))
except AttributeError:
# NoneType; could not load key
return None
# Convert DER to raw R||S
r, s = decode_dss_signature(der_sig)
return r.to_bytes(
32, byteorder='big') + s.to_bytes(32, byteorder='big')
@property
def pub_keyfile(self):
"""
Returns the Public Keyfile Path if set otherwise it returns None
This property returns False if a keyfile was provided, but was invalid
"""
return None if not self._pub_keyfile \
else (False if not self._pub_keyfile[0]
else self._pub_keyfile[0].path)
@property
def prv_keyfile(self):
"""
Returns the Privat Keyfile Path if set otherwise it returns None
This property returns False if a keyfile was provided, but was invalid
"""
return None if not self._prv_keyfile \
else (False if not self._prv_keyfile[0]
else self._prv_keyfile[0].path)
@property
def x962_str(self):
"""
X962 serialization based on public key
"""
try:
return base64_urlencode(
self.public_key().public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint)
)
except AttributeError:
# Public Key could not be generated (public_key() returned None)
return ''
def __bool__(self):
"""
Returns True if at least 1 key was loaded
"""
return True if (self.public_key() or self.private_key()) else False

View File

@ -2752,6 +2752,30 @@ def test_cwe312_url():
'#random') == 'slack://test@B...4/J...M/X...3/' '#random') == 'slack://test@B...4/J...M/X...3/'
def test_base64_encode_decode():
"""
Utils:Base64:URLEncode & Decode
"""
assert utils.base64.base64_urlencode(None) is None
assert utils.base64.base64_urlencode(42) is None
assert utils.base64.base64_urlencode(object) is None
assert utils.base64.base64_urlencode({}) is None
assert utils.base64.base64_urlencode("") is None
assert utils.base64.base64_urlencode("abc") is None
assert utils.base64.base64_urlencode(b"") == ''
assert utils.base64.base64_urlencode(b"abc") == 'YWJj'
assert utils.base64.base64_urldecode(None) is None
assert utils.base64.base64_urldecode(42) is None
assert utils.base64.base64_urldecode(object) is None
assert utils.base64.base64_urldecode({}) is None
assert utils.base64.base64_urldecode("abc") == b'i\xb7'
assert utils.base64.base64_urldecode("") == b''
assert utils.base64.base64_urldecode('YWJj') == b'abc'
def test_dict_base64_codec(tmpdir): def test_dict_base64_codec(tmpdir):
""" """
Test encoding/decoding of base64 content Test encoding/decoding of base64 content

390
test/test_plugin_vapid.py Normal file
View File

@ -0,0 +1,390 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os
import json
import requests
import pytest
from unittest import mock
from apprise.plugins.vapid.subscription import (
WebPushSubscription, WebPushSubscriptionManager)
from apprise.plugins.vapid import NotifyVapid
from apprise import exception, asset, url
from apprise.common import PersistentStoreMode
from apprise.utils.pem import ApprisePEMController
from helpers import AppriseURLTester
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
# Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
# a test UUID we can use
SUBSCRIBER = 'user@example.com'
PLUGIN_ID = 'vapid'
# Our Testing URLs
apprise_url_tests = (
('vapid://', {
'instance': TypeError,
}),
('vapid://:@/', {
'instance': TypeError,
}),
('vapid://invalid-subscriber', {
# An invalid Subscriber
'instance': TypeError,
}),
('vapid://user@example.com', {
# bare bone requirements met, but we don't have our subscription file
# or our private key (pem)
'instance': NotifyVapid,
# We'll fail to respond because we would not have found any
# configuration to load
'notify_response': False,
}),
('vapid://user@example.com/newuser@example.com', {
# we don't have our subscription file or private key
'instance': NotifyVapid,
'notify_response': False,
}),
('vapid://user@example.ca/newuser@example.ca', {
'instance': NotifyVapid,
# force a failure
'response': False,
'requests_response_code': requests.codes.internal_server_error,
}),
('vapid://user@example.uk/newuser@example.uk', {
'instance': NotifyVapid,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('vapid://user@example.au/newuser@example.au', {
'instance': NotifyVapid,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
)
@pytest.fixture
def patch_persistent_store_namespace(tmpdir):
"""
Force an easy to test environment
"""
with mock.patch.object(url.URLBase, 'url_id', return_value=PLUGIN_ID), \
mock.patch.object(
asset.AppriseAsset, 'storage_mode',
PersistentStoreMode.AUTO), \
mock.patch.object(
asset.AppriseAsset, 'storage_path', str(tmpdir)):
tmp_dir = tmpdir.mkdir(PLUGIN_ID)
# Return the directory name
yield str(tmp_dir)
@pytest.fixture
def subscription_reference():
return {
"user@example.com": {
"endpoint": 'https://fcm.googleapis.com/fcm/send/default',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
},
"user1": {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
},
"user2": {
"endpoint": 'https://fcm.googleapis.com/fcm/send/def456',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
},
}
def test_plugin_vapid_urls():
"""
NotifyVapid() Apprise URLs - No Config
"""
# Run our general tests
AppriseURLTester(tests=apprise_url_tests).run_all()
def test_plugin_vapid_urls_with_required_assets(
patch_persistent_store_namespace, subscription_reference):
"""
NotifyVapid() Apprise URLs With Config
"""
# Determine our store
pc = ApprisePEMController(path=patch_persistent_store_namespace)
assert pc.keygen() is True
# Write our subscriptions file to disk
subscription_file = os.path.join(
patch_persistent_store_namespace,
NotifyVapid.vapid_subscription_file)
with open(subscription_file, 'w') as f:
f.write(json.dumps(subscription_reference))
tests = (
('vapid://user@example.com', {
# user@example.com loaded (also used as subscriber id)
'instance': NotifyVapid,
}),
('vapid://user@example.com/newuser@example.com', {
# no newuser@example.com key entry
'instance': NotifyVapid,
'notify_response': False,
}),
('vapid://user@example.com/user1?to=user2', {
# We'll succesfully notify 2 users
'instance': NotifyVapid,
}),
('vapid://user@example.com/default', {
'instance': NotifyVapid,
# force a failure
'response': False,
'requests_response_code': requests.codes.internal_server_error,
}),
('vapid://user@example.com/newuser@example.uk', {
'instance': NotifyVapid,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('vapid://user@example.com/newuser@example.au', {
'instance': NotifyVapid,
# Throws a series of connection and transfer exceptions
# when this flag is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
)
AppriseURLTester(tests=tests).run_all()
def test_plugin_vapid_subscriptions(tmpdir):
"""
NotifyVapid() Subscriptions
"""
# Temporary directory
tmpdir0 = tmpdir.mkdir('tmp00')
with pytest.raises(exception.AppriseInvalidData):
# Integer not supported
WebPushSubscription(42)
with pytest.raises(exception.AppriseInvalidData):
# Not the correct format
WebPushSubscription('bad-content')
with pytest.raises(exception.AppriseInvalidData):
# Invalid JSON
WebPushSubscription('{')
with pytest.raises(exception.AppriseInvalidData):
# Empty Dictionary
WebPushSubscription({})
with pytest.raises(exception.AppriseInvalidData):
WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR=',
"auth": 42,
},
})
with pytest.raises(exception.AppriseInvalidData):
WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 42,
"auth": 'k9Xzm43nBGo=',
},
})
with pytest.raises(exception.AppriseInvalidData):
WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
})
with pytest.raises(exception.AppriseInvalidData):
WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {},
})
with pytest.raises(exception.AppriseInvalidData):
# Invalid p256dh public key provided
wps = WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR=',
"auth": 'k9Xzm43nBGo=',
},
})
# An empty object
wps = WebPushSubscription()
assert bool(wps) is False
assert isinstance(wps.json(), str)
assert json.loads(wps.json())
assert str(wps) == ''
assert wps.auth is None
assert wps.endpoint is None
assert wps.p256dh is None
assert wps.public_key is None
# We can't write anything as there is nothing loaded
assert wps.write(os.path.join(str(tmpdir0), 'subscriptions.json')) is False
# A valid key
wps = WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
})
assert bool(wps) is True
assert isinstance(wps.json(), str)
assert json.loads(wps.json())
assert str(wps) == 'abc123'
assert wps.auth == 'k9Xzm43nBGo='
assert wps.endpoint == 'https://fcm.googleapis.com/fcm/send/abc123'
assert wps.p256dh == 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' \
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0'
assert wps.public_key is not None
# Currently no files here
assert os.listdir(str(tmpdir0)) == []
# Bad content
assert wps.write(object) is False
assert wps.write(None) is False
# Can't write to a name already taken by as a directory
assert wps.write(str(tmpdir0)) is False
# Can't write to a name already taken by as a directory
assert wps.write(os.path.join(str(tmpdir0), 'subscriptions.json')) is True
assert os.listdir(str(tmpdir0)) == ['subscriptions.json']
def test_plugin_vapid_subscription_manager(tmpdir):
"""
NotifyVapid() Subscription Manager
"""
# Temporary directory
tmpdir0 = tmpdir.mkdir('tmp00')
smgr = WebPushSubscriptionManager()
assert bool(smgr) is False
assert len(smgr) == 0
sub = {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
}
assert smgr.add(sub) is True
assert bool(smgr) is True
assert len(smgr) == 1
# Same sub (overwrites same slot)
smgr += sub
assert bool(smgr) is True
assert len(smgr) == 1
# indexed by value added
smgr['abc123'] = sub
assert bool(smgr) is True
assert len(smgr) == 1
assert isinstance(smgr['abc123'], WebPushSubscription)
# Currently no files here
assert os.listdir(str(tmpdir0)) == []
# Write our content
assert smgr.write(
os.path.join(str(tmpdir0), 'subscriptions.json')) is True
assert os.listdir(str(tmpdir0)) == ['subscriptions.json']
# Reset our object
smgr.clear()
assert bool(smgr) is False
assert len(smgr) == 0
# Load our content back
assert smgr.load(
os.path.join(str(tmpdir0), 'subscriptions.json')) is True
assert bool(smgr) is True
assert len(smgr) == 1
# Write over our file using the standard Subscription format
assert smgr['abc123'].write(
os.path.join(str(tmpdir0), 'subscriptions.json')) is True
# We can still open this type as well
assert smgr.load(
os.path.join(str(tmpdir0), 'subscriptions.json')) is True
assert bool(smgr) is True
assert len(smgr) == 1

98
test/test_utils_pem.py Normal file
View File

@ -0,0 +1,98 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import logging
import os
from apprise import AppriseAsset
from apprise import PersistentStoreMode
from apprise import utils
# Disable logging for a cleaner testing output
logging.disable(logging.CRITICAL)
# Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
def test_utils_pem_general(tmpdir):
"""
Utils:PEM
"""
tmpdir0 = tmpdir.mkdir('tmp00')
# Currently no files here
assert os.listdir(str(tmpdir0)) == []
asset = AppriseAsset(
storage_mode=PersistentStoreMode.MEMORY,
storage_path=str(tmpdir0),
pem_autogen=False,
)
# Create a PEM Controller
pem_c = utils.pem.ApprisePEMController(path=None, asset=asset)
# Nothing to lookup
assert pem_c.public_keyfile() is None
assert pem_c.public_key() is None
assert pem_c.x962_str == ''
assert pem_c.encrypt("message") is None
# Keys can not be generated in memory mode
assert pem_c.keygen() is False
asset = AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir0),
pem_autogen=False,
)
# No new files
assert os.listdir(str(tmpdir0)) == []
# Our asset is now write mode, so we will be able to generate a key
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
# Nothing to lookup
assert pem_c.public_keyfile() is None
assert pem_c.public_key() is None
assert pem_c.x962_str == ''
assert pem_c.encrypt("message") is None
# Keys can not be generated in memory mode
assert pem_c.keygen() is True
# We have 2 new key files generated
assert os.listdir(str(tmpdir0)) == ['public_key.pem', 'private_key.pem']
assert pem_c.public_keyfile() is not None
assert pem_c.public_key() is not None
assert len(pem_c.x962_str) > 20
content = pem_c.encrypt("message")
assert isinstance(content, str)
assert pem_c.decrypt(content) == "message"