diff --git a/.coveragerc b/.coveragerc index d6fa055f..eac9df5c 100644 --- a/.coveragerc +++ b/.coveragerc @@ -13,3 +13,5 @@ source = [report] show_missing = True +skip_covered = True +skip_empty = True diff --git a/apprise/plugins/NotifyFCM.py b/apprise/plugins/NotifyFCM.py deleted file mode 100644 index 483f7d66..00000000 --- a/apprise/plugins/NotifyFCM.py +++ /dev/null @@ -1,273 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2020 Chris Caron -# All rights reserved. -# -# This code is licensed under the MIT License. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files(the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions : -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -# For this plugin to work correct, the FCM server must be set up to allow -# for remote connections. - -# Firebase Cloud Messaging -# Docs: https://firebase.google.com/docs/cloud-messaging/send-message - -import requests -from json import dumps - -from .NotifyBase import NotifyBase -from ..common import NotifyType -from ..utils import validate_regex -from ..utils import parse_list -from ..AppriseLocale import gettext_lazy as _ - - -class NotifyFCM(NotifyBase): - """ - A wrapper for Google's Firebase Cloud Messaging Notifications - """ - # The default descriptive name associated with the Notification - service_name = 'Firebase Cloud Messaging' - - # The services URL - service_url = 'https://firebase.google.com' - - # The default protocol - secure_protocol = 'fcm' - - # A URL that takes you to the setup/help of the specific protocol - setup_url = 'https://github.com/caronc/apprise/wiki/Notify_fcm' - - # Project Notification - # https://firebase.google.com/docs/cloud-messaging/send-message - notify_url = \ - "https://fcm.googleapis.com/v1/projects/{project}/messages:send" - - # The maximum length of the body - body_maxlen = 160 - - # A title can not be used for SMS Messages. Setting this to zero will - # cause any title (if defined) to get placed into the message body. - title_maxlen = 0 - - # Define object templates - templates = ( - '{schema}://{project}@{apikey}/{targets}', - ) - - # Define our template - template_tokens = dict(NotifyBase.template_tokens, **{ - 'apikey': { - 'name': _('API Key'), - 'type': 'string', - 'private': True, - 'required': True, - }, - 'project': { - 'name': _('Project ID'), - 'type': 'string', - 'required': True, - }, - 'target_device': { - 'name': _('Target Device'), - 'type': 'string', - 'map_to': 'targets', - }, - 'target_topic': { - 'name': _('Target Topic'), - 'type': 'string', - 'prefix': '#', - 'map_to': 'targets', - }, - 'targets': { - 'name': _('Targets'), - 'type': 'list:string', - }, - }) - - template_args = dict(NotifyBase.template_args, **{ - 'to': { - 'alias_of': 'targets', - }, - }) - - def __init__(self, project, apikey, targets=None, **kwargs): - """ - Initialize Firebase Cloud Messaging - - """ - super(NotifyFCM, self).__init__(**kwargs) - - # The apikey associated with the account - self.apikey = validate_regex(apikey) - if not self.apikey: - msg = 'An invalid FCM API key ' \ - '({}) was specified.'.format(apikey) - self.logger.warning(msg) - raise TypeError(msg) - - # The project ID associated with the account - self.project = validate_regex(project) - if not self.project: - msg = 'An invalid FCM Project ID ' \ - '({}) was specified.'.format(project) - self.logger.warning(msg) - raise TypeError(msg) - - # Acquire Device IDs to notify - self.targets = parse_list(targets) - return - - def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs): - """ - Perform FCM Notification - """ - - if not self.targets: - # There is no one to email; we're done - self.logger.warning('There are no devices or topics to notify') - return False - - headers = { - 'User-Agent': self.app_id, - 'Content-Type': 'application/json', - "Authorization": "Bearer {}".format(self.apikey), - } - - has_error = False - # Create a copy of the targets list - targets = list(self.targets) - while len(targets): - recipient = targets.pop(0) - - payload = { - 'message': { - 'token': None, - 'notification': { - 'title': title, - 'body': body, - } - } - } - - if recipient[0] == '#': - payload['message']['topic'] = recipient[1:] - self.logger.debug( - "FCM recipient %s parsed as a topic", - recipient[1:]) - - else: - payload['message']['token'] = recipient - self.logger.debug( - "FCM recipient %s parsed as a device token", - recipient) - - self.logger.debug('FCM POST URL: %s (cert_verify=%r)' % ( - self.notify_url, self.verify_certificate, - )) - self.logger.debug('FCM Payload: %s' % str(payload)) - - # Always call throttle before any remote server i/o is made - self.throttle() - try: - r = requests.post( - self.notify_url.format(project=self.project), - data=dumps(payload), - headers=headers, - verify=self.verify_certificate, - timeout=self.request_timeout, - ) - if r.status_code not in ( - requests.codes.ok, requests.codes.no_content): - # We had a problem - status_str = \ - NotifyFCM.http_response_code_lookup( - r.status_code) - - self.logger.warning( - 'Failed to send FCM notification: ' - '{}{}error={}.'.format( - status_str, - ', ' if status_str else '', - r.status_code)) - - self.logger.debug( - 'Response Details:\r\n%s', r.content) - - has_error = True - - else: - self.logger.info('Sent FCM notification.') - - except requests.RequestException as e: - self.logger.warning( - 'A Connection error occurred sending FCM ' - 'notification.' - ) - self.logger.debug('Socket Exception: %s', str(e)) - - has_error = True - - return not has_error - - def url(self, privacy=False, *args, **kwargs): - """ - Returns the URL built dynamically based on specified arguments. - """ - - # Our URL parameters - params = self.url_parameters(privacy=privacy, *args, **kwargs) - - return '{schema}://{project}@{apikey}/{targets}?{params}'.format( - schema=self.secure_protocol, - project=NotifyFCM.quote(self.project), - apikey=self.pprint(self.apikey, privacy, safe=''), - targets='/'.join( - [NotifyFCM.quote(x) for x in self.targets]), - params=NotifyFCM.urlencode(params), - ) - - @staticmethod - def parse_url(url): - """ - Parses the URL and returns enough arguments that can allow - us to re-instantiate this object. - - """ - results = NotifyBase.parse_url(url, verify_host=False) - if not results: - # We're done early as we couldn't load the results - return results - - # The project identifier associated with the account - results['project'] = NotifyFCM.unquote(results['user']) - - # The apikey is stored in the hostname - results['apikey'] = NotifyFCM.unquote(results['host']) - - # Get our Device IDs - results['targets'] = NotifyFCM.split_path(results['fullpath']) - - # The 'to' makes it easier to use yaml configuration - if 'to' in results['qsd'] and len(results['qsd']['to']): - results['targets'] += \ - NotifyFCM.parse_list(results['qsd']['to']) - - return results diff --git a/apprise/plugins/NotifyFCM/__init__.py b/apprise/plugins/NotifyFCM/__init__.py new file mode 100644 index 00000000..7ed69d4f --- /dev/null +++ b/apprise/plugins/NotifyFCM/__init__.py @@ -0,0 +1,485 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021 Chris Caron +# All rights reserved. +# +# This code is licensed under the MIT License. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files(the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions : +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +# For this plugin to work correct, the FCM server must be set up to allow +# for remote connections. + +# Firebase Cloud Messaging +# Visit your console page: https://console.firebase.google.com +# 1. Create a project if you haven't already. If you did the +# {project} ID will be listed as name-XXXXX. +# 2. Click on your project from here to open it up. +# 3. Access your Web API Key by clicking on: +# - The (gear-next-to-project-name) > Project Settings > Cloud Messaging + +# Visit the following site to get you're Project information: +# - https://console.cloud.google.com/project/_/settings/general/ +# +# Docs: https://firebase.google.com/docs/cloud-messaging/send-message + +# Legacy Docs: +# https://firebase.google.com/docs/cloud-messaging/http-server-ref\ +# #send-downstream +# +# If you Generate a new private key, it will provide a .json file +# You will need this in order to send an apprise messag +import six +import requests +from json import dumps +from .oauth import GoogleOAuth +from ..NotifyBase import NotifyBase +from ...common import NotifyType +from ...utils import validate_regex +from ...utils import parse_list +from ...AppriseAttachment import AppriseAttachment +from ...AppriseLocale import gettext_lazy as _ + +# Our lookup map +FCM_HTTP_ERROR_MAP = { + 400: 'A bad request was made to the server.', + 401: 'The provided API Key was not valid.', + 404: 'The token could not be registered.', +} + + +class FCMMode(object): + """ + Define the Firebase Cloud Messaging Modes + """ + # The legacy way of sending a message + Legacy = "legacy" + + # The new API + OAuth2 = "oauth2" + + +# FCM Modes +FCM_MODES = ( + # Legacy API + FCMMode.Legacy, + # HTTP v1 URL + FCMMode.OAuth2, +) + + +class NotifyFCM(NotifyBase): + """ + A wrapper for Google's Firebase Cloud Messaging Notifications + """ + # The default descriptive name associated with the Notification + service_name = 'Firebase Cloud Messaging' + + # The services URL + service_url = 'https://firebase.google.com' + + # The default protocol + secure_protocol = 'fcm' + + # A URL that takes you to the setup/help of the specific protocol + setup_url = 'https://github.com/caronc/apprise/wiki/Notify_fcm' + + # Project Notification + # https://firebase.google.com/docs/cloud-messaging/send-message + notify_oauth2_url = \ + "https://fcm.googleapis.com/v1/projects/{project}/messages:send" + + notify_legacy_url = "https://fcm.googleapis.com/fcm/send" + + # There is no reason we should exceed 5KB when reading in a JSON file. + # If it is more than this, then it is not accepted. + max_fcm_keyfile_size = 5000 + + # The maximum length of the body + body_maxlen = 1024 + + # A title can not be used for SMS Messages. Setting this to zero will + # cause any title (if defined) to get placed into the message body. + title_maxlen = 0 + + # Define object templates + templates = ( + # OAuth2 + '{schema}://{project}/{targets}?keyfile={keyfile}', + # Legacy Mode + '{schema}://{apikey}/{targets}', + ) + + # Define our template + template_tokens = dict(NotifyBase.template_tokens, **{ + 'apikey': { + 'name': _('API Key'), + 'type': 'string', + 'private': True, + }, + 'keyfile': { + 'name': _('OAuth2 KeyFile'), + 'type': 'string', + 'private': True, + }, + 'mode': { + 'name': _('Mode'), + 'type': 'choice:string', + 'values': FCM_MODES, + 'default': FCMMode.Legacy, + }, + 'project': { + 'name': _('Project ID'), + 'type': 'string', + 'required': True, + }, + 'target_device': { + 'name': _('Target Device'), + 'type': 'string', + 'map_to': 'targets', + }, + 'target_topic': { + 'name': _('Target Topic'), + 'type': 'string', + 'prefix': '#', + 'map_to': 'targets', + }, + 'targets': { + 'name': _('Targets'), + 'type': 'list:string', + }, + }) + + template_args = dict(NotifyBase.template_args, **{ + 'to': { + 'alias_of': 'targets', + }, + }) + + def __init__(self, project, apikey, targets=None, mode=None, keyfile=None, + **kwargs): + """ + Initialize Firebase Cloud Messaging + + """ + super(NotifyFCM, self).__init__(**kwargs) + + if mode is None: + # Detect our mode + self.mode = FCMMode.OAuth2 if keyfile else FCMMode.Legacy + + else: + # Setup our mode + self.mode = NotifyFCM.template_tokens['mode']['default'] \ + if not isinstance(mode, six.string_types) else mode.lower() + if self.mode and self.mode not in FCM_MODES: + msg = 'The mode specified ({}) is invalid.'.format(mode) + self.logger.warning(msg) + raise TypeError(msg) + + # Used for Legacy Mode; this is the Web API Key retrieved from the + # User Panel + self.apikey = None + + # Path to our Keyfile + self.keyfile = None + + # Our Project ID is required to verify against the keyfile + # specified + self.project = None + + # Initialize our Google OAuth module we can work with + self.oauth = GoogleOAuth( + user_agent=self.app_id, timeout=self.request_timeout, + verify_certificate=self.verify_certificate) + + if self.mode == FCMMode.OAuth2: + # The project ID associated with the account + self.project = validate_regex(project) + if not self.project: + msg = 'An invalid FCM Project ID ' \ + '({}) was specified.'.format(project) + self.logger.warning(msg) + raise TypeError(msg) + + if not keyfile: + msg = 'No FCM JSON KeyFile was specified.' + self.logger.warning(msg) + raise TypeError(msg) + + # Our keyfile object is just an AppriseAttachment object + self.keyfile = AppriseAttachment(asset=self.asset) + # Add our definition to our template + self.keyfile.add(keyfile) + # Enforce maximum file size + self.keyfile[0].max_file_size = self.max_fcm_keyfile_size + + else: # Legacy Mode + + # The apikey associated with the account + self.apikey = validate_regex(apikey) + if not self.apikey: + msg = 'An invalid FCM API key ' \ + '({}) was specified.'.format(apikey) + self.logger.warning(msg) + raise TypeError(msg) + + # Acquire Device IDs to notify + self.targets = parse_list(targets) + return + + @property + def access_token(self): + """ + Generates a access_token based on the keyfile provided + """ + keyfile = self.keyfile[0] + if not keyfile: + # We could not access the keyfile + self.logger.error( + 'Could not access FCM keyfile {}.'.format( + keyfile.url(privacy=True))) + return None + + if not self.oauth.load(keyfile.path): + self.logger.error( + 'FCM keyfile {} could not be loaded.'.format( + keyfile.url(privacy=True))) + return None + + # Verify our project id against the one provided in our keyfile + if self.project != self.oauth.project_id: + self.logger.error( + 'FCM keyfile {} identifies itself for a different project' + .format(keyfile.url(privacy=True))) + return None + + # Return our generated key; the below returns None if a token could + # not be acquired + return self.oauth.access_token + + def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs): + """ + Perform FCM Notification + """ + + if not self.targets: + # There is no one to email; we're done + self.logger.warning('There are no FCM devices or topics to notify') + return False + + if self.mode == FCMMode.OAuth2: + access_token = self.access_token + if not access_token: + # Error message is generated in access_tokengen() so no reason + # to additionally write anything here + return False + + headers = { + 'User-Agent': self.app_id, + 'Content-Type': 'application/json', + "Authorization": "Bearer {}".format(access_token), + } + + # Prepare our notify URL + notify_url = self.notify_oauth2_url + + else: # FCMMode.Legacy + headers = { + 'User-Agent': self.app_id, + 'Content-Type': 'application/json', + "Authorization": "key={}".format(self.apikey), + } + + # Prepare our notify URL + notify_url = self.notify_legacy_url + + has_error = False + # Create a copy of the targets list + targets = list(self.targets) + while len(targets): + recipient = targets.pop(0) + + if self.mode == FCMMode.OAuth2: + payload = { + 'message': { + 'token': None, + 'notification': { + 'title': title, + 'body': body, + } + } + } + + if recipient[0] == '#': + payload['message']['topic'] = recipient[1:] + self.logger.debug( + "FCM recipient %s parsed as a topic", + recipient[1:]) + + else: + payload['message']['token'] = recipient + self.logger.debug( + "FCM recipient %s parsed as a device token", + recipient) + + else: # FCMMode.Legacy + payload = { + 'notification': { + 'notification': { + 'title': title, + 'body': body, + } + } + } + if recipient[0] == '#': + payload['to'] = '/topics/{}'.format(recipient) + self.logger.debug( + "FCM recipient %s parsed as a topic", + recipient[1:]) + + else: + payload['to'] = recipient + self.logger.debug( + "FCM recipient %s parsed as a device token", + recipient) + + self.logger.debug( + 'FCM %s POST URL: %s (cert_verify=%r)', + self.mode, notify_url, self.verify_certificate, + ) + self.logger.debug('FCM %s Payload: %s', self.mode, str(payload)) + + # Always call throttle before any remote server i/o is made + self.throttle() + try: + r = requests.post( + notify_url.format(project=self.project), + data=dumps(payload), + headers=headers, + verify=self.verify_certificate, + timeout=self.request_timeout, + ) + if r.status_code not in ( + requests.codes.ok, requests.codes.no_content): + # We had a problem + status_str = \ + NotifyBase.http_response_code_lookup( + r.status_code, FCM_HTTP_ERROR_MAP) + + self.logger.warning( + 'Failed to send {} FCM notification: ' + '{}{}error={}.'.format( + self.mode, + status_str, + ', ' if status_str else '', + r.status_code)) + + self.logger.debug( + 'Response Details:\r\n%s', r.content) + + has_error = True + + else: + self.logger.info('Sent %s FCM notification.', self.mode) + + except requests.RequestException as e: + self.logger.warning( + 'A Connection error occurred sending FCM ' + 'notification.' + ) + self.logger.debug('Socket Exception: %s', str(e)) + + has_error = True + + return not has_error + + def url(self, privacy=False, *args, **kwargs): + """ + Returns the URL built dynamically based on specified arguments. + """ + + # Define any URL parameters + params = { + 'mode': self.mode, + } + + if self.keyfile: + # Include our keyfile if specified + params['keyfile'] = NotifyFCM.quote( + self.keyfile[0].url(privacy=privacy), safe='') + + # Extend our parameters + params.update(self.url_parameters(privacy=privacy, *args, **kwargs)) + + reference = NotifyFCM.quote(self.project) \ + if self.mode == FCMMode.OAuth2 \ + else self.pprint(self.apikey, privacy, safe='') + + return '{schema}://{reference}/{targets}?{params}'.format( + schema=self.secure_protocol, + reference=reference, + targets='/'.join( + [NotifyFCM.quote(x) for x in self.targets]), + params=NotifyFCM.urlencode(params), + ) + + @staticmethod + def parse_url(url): + """ + Parses the URL and returns enough arguments that can allow + us to re-instantiate this object. + + """ + results = NotifyBase.parse_url(url, verify_host=False) + if not results: + # We're done early as we couldn't load the results + return results + + # The apikey/project is stored in the hostname + results['apikey'] = NotifyFCM.unquote(results['host']) + results['project'] = results['apikey'] + + # Get our Device IDs + results['targets'] = NotifyFCM.split_path(results['fullpath']) + + # Get our mode + results['mode'] = results['qsd'].get('mode') + + # The 'to' makes it easier to use yaml configuration + if 'to' in results['qsd'] and len(results['qsd']['to']): + results['targets'] += \ + NotifyFCM.parse_list(results['qsd']['to']) + + # Our Project ID + if 'project' in results['qsd'] and results['qsd']['project']: + results['project'] = \ + NotifyFCM.unquote(results['qsd']['project']) + + # Our Web API Key + if 'apikey' in results['qsd'] and results['qsd']['apikey']: + results['apikey'] = \ + NotifyFCM.unquote(results['qsd']['apikey']) + + # Our Keyfile (JSON) + if 'keyfile' in results['qsd'] and results['qsd']['keyfile']: + results['keyfile'] = \ + NotifyFCM.unquote(results['qsd']['keyfile']) + + return results diff --git a/apprise/plugins/NotifyFCM/oauth.py b/apprise/plugins/NotifyFCM/oauth.py new file mode 100644 index 00000000..aa9a9675 --- /dev/null +++ b/apprise/plugins/NotifyFCM/oauth.py @@ -0,0 +1,328 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021 Chris Caron +# All rights reserved. +# +# This code is licensed under the MIT License. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files(the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions : +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# To generate a private key file for your service account: +# +# 1. In the Firebase console, open Settings > Service Accounts. +# 2. Click Generate New Private Key, then confirm by clicking Generate Key. +# 3. Securely store the JSON file containing the key. + +import io +import requests +import base64 +import json +import calendar +from cryptography.hazmat import backends +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives import asymmetric +from cryptography.exceptions import UnsupportedAlgorithm +from datetime import datetime +from datetime import timedelta +from ...logger import logger + +try: + # Python 2.7 + from urllib import urlencode as _urlencode + +except ImportError: + # Python 3.x + from urllib.parse import urlencode as _urlencode + +try: + from json.decoder import JSONDecodeError + +except ImportError: + # Python v2.7 Backwards Compatibility support + JSONDecodeError = ValueError + + +class GoogleOAuth(object): + """ + A OAuth simplified implimentation to Google's Firebase Cloud Messaging + + """ + scopes = [ + 'https://www.googleapis.com/auth/firebase.messaging', + ] + + # 1 hour in seconds (the lifetime of our token) + access_token_lifetime_sec = timedelta(seconds=3600) + + # The default URI to use if one is not found + default_token_uri = 'https://oauth2.googleapis.com/token' + + # Taken right from google.auth.helpers: + clock_skew = timedelta(seconds=10) + + def __init__(self, user_agent=None, timeout=(5, 4), + verify_certificate=True): + """ + Initialize our OAuth object + """ + + # Wether or not to verify ssl + self.verify_certificate = verify_certificate + + # Our (connect, read) timeout + self.request_timeout = timeout + + # assign our user-agent if defined + self.user_agent = user_agent + + # initialize our other object variables + self.__reset() + + def __reset(self): + """ + Reset object internal variables + """ + + # Google Keyfile Encoding + self.encoding = 'utf-8' + + # Our retrieved JSON content (unmangled) + self.content = None + + # Our generated key information we cache once loaded + self.private_key = None + + # Our keys we build using the provided content + self.__refresh_token = None + self.__access_token = None + self.__access_token_expiry = datetime.utcnow() + + def load(self, path): + """ + Generate our SSL details + """ + + # Reset our objects + self.content = None + self.private_key = None + self.__access_token = None + self.__access_token_expiry = datetime.utcnow() + + try: + with io.open(path, mode="r", encoding=self.encoding) as fp: + self.content = json.loads(fp.read()) + + except (OSError, IOError): + logger.debug('FCM keyfile {} could not be accessed'.format(path)) + return False + + except JSONDecodeError as e: + logger.debug( + 'FCM keyfile {} generated a JSONDecodeError: {}'.format( + path, e)) + return False + + if not isinstance(self.content, dict): + logger.debug( + 'FCM keyfile {} is incorrectly structured'.format(path)) + self.__reset() + return False + + # Verify we've got the correct tokens in our content to work with + is_valid = next((False for k in ( + 'client_email', 'private_key_id', 'private_key', + 'type', 'project_id') if not self.content.get(k)), True) + + if not is_valid: + logger.debug( + 'FCM keyfile {} is missing required information'.format(path)) + self.__reset() + return False + + # Verify our service_account type + if self.content.get('type') != 'service_account': + logger.debug( + 'FCM keyfile {} is not of type service_account'.format(path)) + self.__reset() + return False + + # Prepare our private key which is in PKCS8 PEM format + try: + self.private_key = serialization.load_pem_private_key( + self.content.get('private_key').encode(self.encoding), + password=None, backend=backends.default_backend()) + + except (TypeError, ValueError): + # ValueError: If the PEM data could not be decrypted or if its + # structure could not be decoded successfully. + # TypeError: If a password was given and the private key was + # not encrypted. Or if the key was encrypted but + # no password was supplied. + logger.error('FCM provided private key is invalid.') + self.__reset() + return False + + except UnsupportedAlgorithm: + # If the serialized key is of a type that is not supported by + # the backend. + logger.error('FCM provided private key is not supported') + self.__reset() + return False + + # We've done enough validation to move on + return True + + @property + def access_token(self): + """ + Returns our access token (if it hasn't expired yet) + - if we do not have one we'll fetch one. + - if it expired, we'll renew it + - if a key simply can't be acquired, then we return None + """ + + if not self.private_key or not self.content: + # invalid content (or not loaded) + logger.error( + 'No FCM JSON keyfile content loaded to generate a access ' + 'token with.') + return None + + if self.__access_token_expiry > datetime.utcnow(): + # Return our no-expired key + return self.__access_token + + # If we reach here we need to prepare our payload + token_uri = self.content.get('token_uri', self.default_token_uri) + service_email = self.content.get('client_email') + key_identifier = self.content.get('private_key_id') + + # Generate our Assertion + now = datetime.utcnow() + expiry = now + self.access_token_lifetime_sec + + payload = { + # The number of seconds since the UNIX epoch. + "iat": calendar.timegm(now.utctimetuple()), + "exp": calendar.timegm(expiry.utctimetuple()), + # The issuer must be the service account email. + "iss": service_email, + # The audience must be the auth token endpoint's URI + "aud": token_uri, + # Our token scopes + "scope": " ".join(self.scopes), + } + + # JWT Details + header = { + 'typ': 'JWT', + 'alg': 'RS256' if isinstance( + self.private_key, asymmetric.rsa.RSAPrivateKey) else 'ES256', + + # Key Identifier + 'kid': key_identifier, + } + + # Encodes base64 strings removing any padding characters. + segments = [ + base64.urlsafe_b64encode( + json.dumps(header).encode(self.encoding)).rstrip(b"="), + base64.urlsafe_b64encode( + json.dumps(payload).encode(self.encoding)).rstrip(b"="), + ] + + signing_input = b".".join(segments) + signature = self.private_key.sign( + signing_input, + asymmetric.padding.PKCS1v15(), + hashes.SHA256(), + ) + + # Finally append our segment + segments.append(base64.urlsafe_b64encode(signature).rstrip(b"=")) + assertion = b".".join(segments) + + http_payload = _urlencode({ + 'assertion': assertion, + 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', + }) + + http_headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + } + if self.user_agent: + http_headers['User-Agent'] = self.user_agent + + logger.info('Refreshing FCM Access Token') + try: + r = requests.post( + token_uri, + data=http_payload, + headers=http_headers, + verify=self.verify_certificate, + timeout=self.request_timeout, + ) + if r.status_code != requests.codes.ok: + # We had a problem + logger.warning( + 'Failed to update FCM Access Token error={}.' + .format(r.status_code)) + + logger.debug( + 'Response Details:\r\n%s', r.content) + return None + + except requests.RequestException as e: + logger.warning( + 'A Connection error occurred refreshing FCM ' + 'Access Token.' + ) + logger.debug('Socket Exception: %s', str(e)) + return None + + # If we get here, we made our request successfully, now we need + # to parse out the data + response = json.loads(r.content) + self.__access_token = response['access_token'] + self.__refresh_token = response.get( + 'refresh_token', self.__refresh_token) + + if 'expires_in' in response: + delta = timedelta(seconds=int(response['expires_in'])) + self.__access_token_expiry = \ + delta + datetime.utcnow() - self.clock_skew + + else: + # Allow some grace before we expire + self.__access_token_expiry = expiry - self.clock_skew + + logger.debug( + 'Access Token successfully acquired: %s', self.__access_token) + + # Return our token + return self.__access_token + + @property + def project_id(self): + """ + Returns the project id found in the file + """ + return None if not self.content \ + else self.content.get('project_id') diff --git a/apprise/plugins/NotifySimplePush.py b/apprise/plugins/NotifySimplePush.py index dd192e79..7e3c022a 100644 --- a/apprise/plugins/NotifySimplePush.py +++ b/apprise/plugins/NotifySimplePush.py @@ -32,24 +32,13 @@ from ..common import NotifyType from ..utils import validate_regex from ..AppriseLocale import gettext_lazy as _ -# Default our global support flag -CRYPTOGRAPHY_AVAILABLE = False - -try: - from cryptography.hazmat.primitives import padding - from cryptography.hazmat.primitives.ciphers import Cipher - from cryptography.hazmat.primitives.ciphers import algorithms - from cryptography.hazmat.primitives.ciphers import modes - from cryptography.hazmat.backends import default_backend - from base64 import urlsafe_b64encode - import hashlib - - CRYPTOGRAPHY_AVAILABLE = True - -except ImportError: - # no problem; this just means the added encryption functionality isn't - # available. You can still send a SimplePush message - pass +from cryptography.hazmat.primitives import padding +from cryptography.hazmat.primitives.ciphers import Cipher +from cryptography.hazmat.primitives.ciphers import algorithms +from cryptography.hazmat.primitives.ciphers import modes +from cryptography.hazmat.backends import default_backend +from base64 import urlsafe_b64encode +import hashlib class NotifySimplePush(NotifyBase): @@ -181,15 +170,6 @@ class NotifySimplePush(NotifyBase): Perform SimplePush Notification """ - # Encrypt Message (providing support is available) - if self.password and self.user and not CRYPTOGRAPHY_AVAILABLE: - # Provide the end user at least some notification that they're - # not getting what they asked for - self.logger.warning( - "Authenticated SimplePush Notifications are not supported by " - "this system; `pip install cryptography`.") - return False - headers = { 'User-Agent': self.app_id, 'Content-type': "application/x-www-form-urlencoded", @@ -200,7 +180,7 @@ class NotifySimplePush(NotifyBase): 'key': self.apikey, } - if self.password and self.user and CRYPTOGRAPHY_AVAILABLE: + if self.password and self.user: body = self._encrypt(body) title = self._encrypt(title) payload.update({ diff --git a/dev-requirements.txt b/dev-requirements.txt index 4faacc5f..57ae8b85 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -10,9 +10,6 @@ babel # Plugin Dependencies # -# Used for test cases on spush:// -cryptography - # Provides xmpp:// support sleekxmpp diff --git a/requirements.txt b/requirements.txt index 32510239..b2883c03 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +cryptography requests requests-oauthlib six diff --git a/test/test_fcm_plugin.py b/test/test_fcm_plugin.py new file mode 100644 index 00000000..1eab67dc --- /dev/null +++ b/test/test_fcm_plugin.py @@ -0,0 +1,304 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021 Chris Caron +# All rights reserved. +# +# This code is licensed under the MIT License. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files(the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions : +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +import io +import os +import mock +import requests +import json +from apprise import Apprise +from apprise import plugins +from apprise.plugins.NotifyFCM.oauth import GoogleOAuth +from cryptography.exceptions import UnsupportedAlgorithm + +try: + from json.decoder import JSONDecodeError + +except ImportError: + # Python v2.7 Backwards Compatibility support + JSONDecodeError = ValueError + +# Test files for KeyFile Directory +PRIVATE_KEYFILE_DIR = os.path.join(os.path.dirname(__file__), 'var', 'fcm') + + +@mock.patch('requests.post') +def test_fcm_plugin(mock_post): + """ + API: NotifyFCM() General Checks + + """ + # Valid Keyfile + path = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account.json') + + # Disable Throttling to speed testing + plugins.NotifyBase.request_rate_per_sec = 0 + + # Prepare a good response + response = mock.Mock() + response.content = json.dumps({ + "access_token": "ya29.c.abcd", + "expires_in": 3599, + "token_type": "Bearer", + }) + response.status_code = requests.codes.ok + mock_post.return_value = response + + # Test having a valid keyfile, but not a valid project id match + obj = Apprise.instantiate( + 'fcm://invalid_project_id/device/?keyfile={}'.format(str(path))) + # we'll fail as a result + assert obj.notify("test") is False + + # Test our call count + assert mock_post.call_count == 0 + + # Now we test using a valid Project ID but we can't open our file + obj = Apprise.instantiate( + 'fcm://mock-project-id/device/?keyfile={}'.format(str(path))) + + with mock.patch('io.open', side_effect=OSError): + # we'll fail as a result + assert obj.notify("test") is False + + # Test our call count + assert mock_post.call_count == 0 + + # Now we test using a valid Project ID + obj = Apprise.instantiate( + 'fcm://mock-project-id/device/#topic/?keyfile={}'.format(str(path))) + + # we'll fail as a result + assert obj.notify("test") is True + + # Test our call count + assert mock_post.call_count == 3 + assert mock_post.call_args_list[0][0][0] == \ + 'https://accounts.google.com/o/oauth2/token' + assert mock_post.call_args_list[1][0][0] == \ + 'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send' + assert mock_post.call_args_list[2][0][0] == \ + 'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send' + + +@mock.patch('requests.post') +def test_fcm_keyfile_parse(mock_post): + """ + API: NotifyFCM() KeyFile Tests + """ + + # Prepare a good response + response = mock.Mock() + response.content = json.dumps({ + "access_token": "ya29.c.abcd", + "expires_in": 3599, + "token_type": "Bearer", + }) + response.status_code = requests.codes.ok + mock_post.return_value = response + + path = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account.json') + oauth = GoogleOAuth() + # We can not get an Access Token without content loaded + assert oauth.access_token is None + + # Load our content + assert oauth.load(path) is True + assert oauth.access_token is not None + + # Test our call count + assert mock_post.call_count == 1 + assert mock_post.call_args_list[0][0][0] == \ + 'https://accounts.google.com/o/oauth2/token' + + mock_post.reset_mock() + # a second call uses cache since our token hasn't expired yet + assert oauth.access_token is not None + assert mock_post.call_count == 0 + + # Same test case without expires_in entry + mock_post.reset_mock() + response.content = json.dumps({ + "access_token": "ya29.c.abcd", + "token_type": "Bearer", + }) + + oauth = GoogleOAuth() + assert oauth.load(path) is True + assert oauth.access_token is not None + + # Test our call count + assert mock_post.call_count == 1 + assert mock_post.call_args_list[0][0][0] == \ + 'https://accounts.google.com/o/oauth2/token' + + # Test user-agent override + mock_post.reset_mock() + oauth = GoogleOAuth(user_agent="test-agent-override") + assert oauth.load(path) is True + assert oauth.access_token is not None + assert mock_post.call_count == 1 + assert mock_post.call_args_list[0][0][0] == \ + 'https://accounts.google.com/o/oauth2/token' + + # + # Test some errors that can get thrown when trying to handle + # the service_account.json file + # + + # Reset our object + mock_post.reset_mock() + + # Now we test a case where we can't access the file we've been pointed to: + oauth = GoogleOAuth() + with mock.patch('io.open', side_effect=OSError): + # We will fail to retrieve our Access Token + assert oauth.load(path) is False + assert oauth.access_token is None + + oauth = GoogleOAuth() + with mock.patch('json.loads', side_effect=([], )): + # We will fail to retrieve our Access Token since we did not parse + # a dictionary + assert oauth.load(path) is False + assert oauth.access_token is None + + # Case where we can't load the PEM key: + oauth = GoogleOAuth() + with mock.patch( + 'cryptography.hazmat.primitives.serialization' + '.load_pem_private_key', + side_effect=ValueError("")): + assert oauth.load(path) is False + assert oauth.access_token is None + + # Case where we can't load the PEM key: + oauth = GoogleOAuth() + with mock.patch( + 'cryptography.hazmat.primitives.serialization' + '.load_pem_private_key', + side_effect=TypeError("")): + assert oauth.load(path) is False + assert oauth.access_token is None + + # Case where we can't load the PEM key: + oauth = GoogleOAuth() + with mock.patch( + 'cryptography.hazmat.primitives.serialization' + '.load_pem_private_key', + side_effect=UnsupportedAlgorithm("")): + # Note: This test should be te + assert oauth.load(path) is False + assert oauth.access_token is None + + # Not one call was made to the web + assert mock_post.call_count == 0 + + # + # Test some web errors that can occur when speaking upstream + # with Google to get our token generated + # + response.status_code = requests.codes.internal_server_error + + mock_post.reset_mock() + oauth = GoogleOAuth() + assert oauth.load(path) is True + + # We'll fail due to an bad web response + assert oauth.access_token is None + + # Return our status code to how it was + response.status_code = requests.codes.ok + + # No access token + bad_response_1 = mock.Mock() + bad_response_1.content = json.dumps({ + "expires_in": 3599, + "token_type": "Bearer", + }) + + # Invalid JSON + bad_response_2 = mock.Mock() + bad_response_2.content = '{' + + mock_post.return_value = None + # Throw an exception on the first call to requests.post() + for side_effect in ( + requests.RequestException(), bad_response_1, bad_response_2): + mock_post.side_effect = side_effect + + # Test all of our bad side effects + oauth = GoogleOAuth() + assert oauth.load(path) is True + + # We'll fail due to an bad web response + assert oauth.access_token is None + + +def test_fcm_bad_keyfile_parse(): + """ + API: NotifyFCM() KeyFile Bad Service Account Type Tests + """ + + path = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account-bad-type.json') + oauth = GoogleOAuth() + assert oauth.load(path) is False + + +def test_fcm_keyfile_missing_entries_parse(tmpdir): + """ + API: NotifyFCM() KeyFile Missing Entries Test + """ + + # Prepare a base keyfile reference to use + path = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account.json') + with io.open(path, mode="r", encoding='utf-8') as fp: + content = json.loads(fp.read()) + + path = tmpdir.join('fcm_keyfile.json') + + # Test that we fail to load if the following keys are missing: + for entry in ( + 'client_email', 'private_key_id', 'private_key', 'type', + 'project_id'): + + # Ensure the key actually exists in our file + assert entry in content + + # Create a copy of our content + content_copy = content.copy() + + # Remove our entry we expect to validate against + del content_copy[entry] + assert entry not in content_copy + + path.write(json.dumps(content_copy)) + + oauth = GoogleOAuth() + assert oauth.load(str(path)) is False + + # Now write ourselves a bad JSON file + path.write('{') + oauth = GoogleOAuth() + assert oauth.load(str(path)) is False diff --git a/test/test_rest_plugins.py b/test/test_rest_plugins.py index c0828ead..773d4886 100644 --- a/test/test_rest_plugins.py +++ b/test/test_rest_plugins.py @@ -604,50 +604,99 @@ TEST_URLS = ( # We failed to identify any valid authentication 'instance': TypeError, }), - ('fcm://apikey/', { - # no project id specified - 'instance': TypeError, - }), ('fcm://project@%20%20/', { # invalid apikey 'instance': TypeError, }), - ('fcm://project@apikey/', { - # No targets specified; we will initialize but not notify anything + ('fcm://apikey/', { + # no project id specified so we operate in legacy mode 'instance': plugins.NotifyFCM, + # but there are no targets specified so we return False 'notify_response': False, }), - ('fcm://project@apikey/device', { + ('fcm://apikey/device', { # Valid device 'instance': plugins.NotifyFCM, - 'privacy_url': 'fcm://project@a...y/device', + 'privacy_url': 'fcm://a...y/device', }), - ('fcm://project@apikey/#topic', { + ('fcm://apikey/#topic', { # Valid topic 'instance': plugins.NotifyFCM, - 'privacy_url': 'fcm://project@a...y/%23topic', + 'privacy_url': 'fcm://a...y/%23topic', }), - ('fcm://project@apikey/#topic1/device/%20/', { + ('fcm://apikey/device?mode=invalid', { + # Valid device, invalid mode + 'instance': TypeError, + }), + ('fcm://apikey/#topic1/device/%20/', { # Valid topic, valid device, and invalid entry 'instance': plugins.NotifyFCM, }), - ('fcm://project@apikey?to=#topic1,device', { + ('fcm://apikey?to=#topic1,device', { # Test to= 'instance': plugins.NotifyFCM, }), - ('fcm://project@apikey/#topic1/device/', { + ('fcm://?apikey=abc123&to=device', { + # Test apikey= to= + 'instance': plugins.NotifyFCM, + }), + ('fcm://%20?to=device&keyfile=/invalid/path', { + # invalid Project ID + 'instance': TypeError, + }), + ('fcm://project_id?to=device&keyfile=/invalid/path', { + # Test to= and auto detection of oauth mode + 'instance': plugins.NotifyFCM, + # we'll fail to send our notification as a result + 'response': False, + }), + ('fcm://?to=device&project=project_id&keyfile=/invalid/path', { + # Test project= & to= and auto detection of oauth mode + 'instance': plugins.NotifyFCM, + # we'll fail to send our notification as a result + 'response': False, + }), + ('fcm://project_id?to=device&mode=oauth2', { + # no keyfile was specified + 'instance': TypeError, + }), + ('fcm://project_id?to=device&mode=oauth2&keyfile=/invalid/path', { + # Same test as above except we explicitly set our oauth2 mode + # Test to= and auto detection of oauth mode + 'instance': plugins.NotifyFCM, + # we'll fail to send our notification as a result + 'response': False, + }), + ('fcm://apikey/#topic1/device/?mode=legacy', { 'instance': plugins.NotifyFCM, # throw a bizzare code forcing us to fail to look it up 'response': False, 'requests_response_code': 999, }), - ('fcm://project@apikey/#topic1/device/', { + ('fcm://apikey/#topic1/device/?mode=legacy', { 'instance': plugins.NotifyFCM, # Throws a series of connection and transfer exceptions when this flag # is set and tests that we gracfully handle them 'test_requests_exceptions': True, }), - + ('fcm://project/#topic1/device/?mode=oauth2&keyfile=file://{}'.format( + os.path.join( + os.path.dirname(__file__), 'var', 'fcm', + 'service_account.json')), { + 'instance': plugins.NotifyFCM, + # throw a bizzare code forcing us to fail to look it up + 'response': False, + 'requests_response_code': 999, + }), + ('fcm://projectid/#topic1/device/?mode=oauth2&keyfile=file://{}'.format( + os.path.join( + os.path.dirname(__file__), 'var', 'fcm', + 'service_account.json')), { + 'instance': plugins.NotifyFCM, + # Throws a series of connection and transfer exceptions when + # this flag is set and tests that we gracfully handle them + 'test_requests_exceptions': True, + }), ################################## # NotifyFlock ################################## diff --git a/test/test_simplepush_plugin.py b/test/test_simplepush_plugin.py deleted file mode 100644 index e605f174..00000000 --- a/test/test_simplepush_plugin.py +++ /dev/null @@ -1,93 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2019 Chris Caron -# All rights reserved. -# -# This code is licensed under the MIT License. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files(the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions : -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -import os -import sys -import pytest -import apprise - -try: - # Python v3.4+ - from importlib import reload -except ImportError: - try: - # Python v3.0-v3.3 - from imp import reload - except ImportError: - # Python v2.7 - pass - -# Disable logging for a cleaner testing output -import logging -logging.disable(logging.CRITICAL) - - -@pytest.mark.skipif( - 'cryptography' not in sys.modules, reason="requires cryptography") -def test_simplepush_plugin(tmpdir): - """ - API: NotifySimplePush Plugin() - - """ - suite = tmpdir.mkdir("simplepush") - suite.join("__init__.py").write('') - module_name = 'cryptography' - suite.join("{}.py".format(module_name)).write('raise ImportError()') - - # Update our path to point to our new test suite - sys.path.insert(0, str(suite)) - - for name in list(sys.modules.keys()): - if name.startswith('{}.'.format(module_name)): - del sys.modules[name] - del sys.modules[module_name] - - # The following libraries need to be reloaded to prevent - # TypeError: super(type, obj): obj must be an instance or subtype of type - # This is better explained in this StackOverflow post: - # https://stackoverflow.com/questions/31363311/\ - # any-way-to-manually-fix-operation-of-\ - # super-after-ipython-reload-avoiding-ty - # - reload(sys.modules['apprise.plugins.NotifySimplePush']) - reload(sys.modules['apprise.plugins']) - reload(sys.modules['apprise.Apprise']) - reload(sys.modules['apprise']) - - # Without the cryptography library objects can still be instantiated - # however notifications will fail - obj = apprise.Apprise.instantiate('spush://salt:pass@valid_api_key') - assert obj is not None - - # We can't notify with a user/pass combo and no cryptography library - assert obj.notify( - title="test message title", body="message body") is False - - # Tidy-up / restore things to how they were - os.unlink(str(suite.join("{}.py".format(module_name)))) - reload(sys.modules['apprise.plugins.NotifySimplePush']) - reload(sys.modules['apprise.plugins']) - reload(sys.modules['apprise.Apprise']) - reload(sys.modules['apprise']) diff --git a/test/var/fcm/service_account-bad-type.json b/test/var/fcm/service_account-bad-type.json new file mode 100644 index 00000000..5b47755b --- /dev/null +++ b/test/var/fcm/service_account-bad-type.json @@ -0,0 +1,12 @@ +{ + "type": "bad_type", + "project_id": "mock-project-id", + "private_key_id": "mock-key-id-1", + "private_key": "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEAwJENcRev+eXZKvhhWLiV3Lz2MvO+naQRHo59g3vaNQnbgyduN/L4krlr\nJ5c6FiikXdtJNb/QrsAHSyJWCu8j3T9CruiwbidGAk2W0RuViTVspjHUTsIHExx9euWM0Uom\nGvYkoqXahdhPL/zViVSJt+Rt8bHLsMvpb8RquTIb9iKY3SMV2tCofNmyCSgVbghq/y7lKORt\nV/IRguWs6R22fbkb0r2MCYoNAbZ9dqnbRIFNZBC7itYtUoTEresRWcyFMh0zfAIJycWOJlVL\nDLqkY2SmIx8u7fuysCg1wcoSZoStuDq02nZEMw1dx8HGzE0hynpHlloRLByuIuOAfMCCYwID\nAQABAoIBADFtihu7TspAO0wSUTpqttzgC/nsIsNn95T2UjVLtyjiDNxPZLUrwq42tdCFur0x\nVW9Z+CK5x6DzXWvltlw8IeKKeF1ZEOBVaFzy+YFXKTz835SROcO1fgdjyrme7lRSShGlmKW/\nGKY+baUNquoDLw5qreXaE0SgMp0jt5ktyYuVxvhLDeV4omw2u6waoGkifsGm8lYivg5l3VR7\nw2IVOvYZTt4BuSYVwOM+qjwaS1vtL7gv0SUjrj85Ja6zERRdFiITDhZw6nsvacr9/+/aut9E\naL/koSSb62g5fntQMEwoT4hRnjPnAedmorM9Rhddh2TB3ZKTBbMN1tUk3fJxOuECgYEA+z6l\neSaAcZ3qvwpntcXSpwwJ0SSmzLTH2RJNf+Ld3eBHiSvLTG53dWB7lJtF4R1KcIwf+KGcOFJv\nsnepzcZBylRvT8RrAAkV0s9OiVm1lXZyaepbLg4GGFJBPi8A6VIAj7zYknToRApdW0s1x/XX\nChewfJDckqsevTMovdbg8YkCgYEAxDYX+3mfvv/opo6HNNY3SfVunM+4vVJL+n8gWZ2w9kz3\nQ9Ub9YbRmI7iQaiVkO5xNuoG1n9bM+3Mnm84aQ1YeNT01YqeyQsipP5Wi+um0PzYTaBw9RO+\n8Gh6992OwlJiRtFk5WjalNWOxY4MU0ImnJwIfKQlUODvLmcixm68NYsCgYEAuAqI3jkk55Vd\nKvotREsX5wP7gPePM+7NYiZ1HNQL4Ab1f/bTojZdTV8Sx6YCR0fUiqMqnE+OBvfkGGBtw22S\nLesx6sWf99Ov58+x4Q0U5dpxL0Lb7d2Z+2Dtp+Z4jXFjNeeI4ae/qG/LOR/b0pE0J5F415ap\n7Mpq5v89vepUtrkCgYAjMXytu4v+q1Ikhc4UmRPDrUUQ1WVSd+9u19yKlnFGTFnRjej86hiw\nH3jPxBhHra0a53EgiilmsBGSnWpl1WH4EmJz5vBCKUAmjgQiBrueIqv9iHiaTNdjsanUyaWw\njyxXfXl2eI80QPXh02+8g1H/pzESgjK7Rg1AqnkfVH9nrwKBgQDJVxKBPTw9pigYMVt9iHrR\niCl9zQVjRMbWiPOc0J56+/5FZYm/AOGl9rfhQ9vGxXZYZiOP5FsNkwt05Y1UoAAH4B4VQwbL\nqod71qOcI0ywgZiIR87CYw40gzRfjWnN+YEEW1qfyoNLilEwJB8iB/T+ZePHGmJ4MmQ/cTn9\nxpdLXA==\n-----END RSA PRIVATE KEY-----", + "client_email": "mock-email@mock-project.iam.gserviceaccount.com", + "client_id": "1234567890", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://accounts.google.com/o/oauth2/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/mock-project-id.iam.gserviceaccount.com" +} diff --git a/test/var/fcm/service_account.json b/test/var/fcm/service_account.json new file mode 100644 index 00000000..ee8357f8 --- /dev/null +++ b/test/var/fcm/service_account.json @@ -0,0 +1,12 @@ +{ + "type": "service_account", + "project_id": "mock-project-id", + "private_key_id": "mock-key-id-1", + "private_key": "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEAwJENcRev+eXZKvhhWLiV3Lz2MvO+naQRHo59g3vaNQnbgyduN/L4krlr\nJ5c6FiikXdtJNb/QrsAHSyJWCu8j3T9CruiwbidGAk2W0RuViTVspjHUTsIHExx9euWM0Uom\nGvYkoqXahdhPL/zViVSJt+Rt8bHLsMvpb8RquTIb9iKY3SMV2tCofNmyCSgVbghq/y7lKORt\nV/IRguWs6R22fbkb0r2MCYoNAbZ9dqnbRIFNZBC7itYtUoTEresRWcyFMh0zfAIJycWOJlVL\nDLqkY2SmIx8u7fuysCg1wcoSZoStuDq02nZEMw1dx8HGzE0hynpHlloRLByuIuOAfMCCYwID\nAQABAoIBADFtihu7TspAO0wSUTpqttzgC/nsIsNn95T2UjVLtyjiDNxPZLUrwq42tdCFur0x\nVW9Z+CK5x6DzXWvltlw8IeKKeF1ZEOBVaFzy+YFXKTz835SROcO1fgdjyrme7lRSShGlmKW/\nGKY+baUNquoDLw5qreXaE0SgMp0jt5ktyYuVxvhLDeV4omw2u6waoGkifsGm8lYivg5l3VR7\nw2IVOvYZTt4BuSYVwOM+qjwaS1vtL7gv0SUjrj85Ja6zERRdFiITDhZw6nsvacr9/+/aut9E\naL/koSSb62g5fntQMEwoT4hRnjPnAedmorM9Rhddh2TB3ZKTBbMN1tUk3fJxOuECgYEA+z6l\neSaAcZ3qvwpntcXSpwwJ0SSmzLTH2RJNf+Ld3eBHiSvLTG53dWB7lJtF4R1KcIwf+KGcOFJv\nsnepzcZBylRvT8RrAAkV0s9OiVm1lXZyaepbLg4GGFJBPi8A6VIAj7zYknToRApdW0s1x/XX\nChewfJDckqsevTMovdbg8YkCgYEAxDYX+3mfvv/opo6HNNY3SfVunM+4vVJL+n8gWZ2w9kz3\nQ9Ub9YbRmI7iQaiVkO5xNuoG1n9bM+3Mnm84aQ1YeNT01YqeyQsipP5Wi+um0PzYTaBw9RO+\n8Gh6992OwlJiRtFk5WjalNWOxY4MU0ImnJwIfKQlUODvLmcixm68NYsCgYEAuAqI3jkk55Vd\nKvotREsX5wP7gPePM+7NYiZ1HNQL4Ab1f/bTojZdTV8Sx6YCR0fUiqMqnE+OBvfkGGBtw22S\nLesx6sWf99Ov58+x4Q0U5dpxL0Lb7d2Z+2Dtp+Z4jXFjNeeI4ae/qG/LOR/b0pE0J5F415ap\n7Mpq5v89vepUtrkCgYAjMXytu4v+q1Ikhc4UmRPDrUUQ1WVSd+9u19yKlnFGTFnRjej86hiw\nH3jPxBhHra0a53EgiilmsBGSnWpl1WH4EmJz5vBCKUAmjgQiBrueIqv9iHiaTNdjsanUyaWw\njyxXfXl2eI80QPXh02+8g1H/pzESgjK7Rg1AqnkfVH9nrwKBgQDJVxKBPTw9pigYMVt9iHrR\niCl9zQVjRMbWiPOc0J56+/5FZYm/AOGl9rfhQ9vGxXZYZiOP5FsNkwt05Y1UoAAH4B4VQwbL\nqod71qOcI0ywgZiIR87CYw40gzRfjWnN+YEEW1qfyoNLilEwJB8iB/T+ZePHGmJ4MmQ/cTn9\nxpdLXA==\n-----END RSA PRIVATE KEY-----", + "client_email": "mock-email@mock-project.iam.gserviceaccount.com", + "client_id": "1234567890", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://accounts.google.com/o/oauth2/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/mock-project-id.iam.gserviceaccount.com" +}