Improved FCM Support Supporting both Legacy and OAuth2 Methods (#353)

pull/363/head
Chris Caron 2021-02-14 14:03:12 -05:00 committed by GitHub
parent ca22b931ca
commit 4a2f60e338
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1216 additions and 412 deletions

View File

@ -13,3 +13,5 @@ source =
[report]
show_missing = True
skip_covered = True
skip_empty = True

View File

@ -1,273 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# For this plugin to work correct, the FCM server must be set up to allow
# for remote connections.
# Firebase Cloud Messaging
# Docs: https://firebase.google.com/docs/cloud-messaging/send-message
import requests
from json import dumps
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import validate_regex
from ..utils import parse_list
from ..AppriseLocale import gettext_lazy as _
class NotifyFCM(NotifyBase):
"""
A wrapper for Google's Firebase Cloud Messaging Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Firebase Cloud Messaging'
# The services URL
service_url = 'https://firebase.google.com'
# The default protocol
secure_protocol = 'fcm'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_fcm'
# Project Notification
# https://firebase.google.com/docs/cloud-messaging/send-message
notify_url = \
"https://fcm.googleapis.com/v1/projects/{project}/messages:send"
# The maximum length of the body
body_maxlen = 160
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# Define object templates
templates = (
'{schema}://{project}@{apikey}/{targets}',
)
# Define our template
template_tokens = dict(NotifyBase.template_tokens, **{
'apikey': {
'name': _('API Key'),
'type': 'string',
'private': True,
'required': True,
},
'project': {
'name': _('Project ID'),
'type': 'string',
'required': True,
},
'target_device': {
'name': _('Target Device'),
'type': 'string',
'map_to': 'targets',
},
'target_topic': {
'name': _('Target Topic'),
'type': 'string',
'prefix': '#',
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
})
def __init__(self, project, apikey, targets=None, **kwargs):
"""
Initialize Firebase Cloud Messaging
"""
super(NotifyFCM, self).__init__(**kwargs)
# The apikey associated with the account
self.apikey = validate_regex(apikey)
if not self.apikey:
msg = 'An invalid FCM API key ' \
'({}) was specified.'.format(apikey)
self.logger.warning(msg)
raise TypeError(msg)
# The project ID associated with the account
self.project = validate_regex(project)
if not self.project:
msg = 'An invalid FCM Project ID ' \
'({}) was specified.'.format(project)
self.logger.warning(msg)
raise TypeError(msg)
# Acquire Device IDs to notify
self.targets = parse_list(targets)
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform FCM Notification
"""
if not self.targets:
# There is no one to email; we're done
self.logger.warning('There are no devices or topics to notify')
return False
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/json',
"Authorization": "Bearer {}".format(self.apikey),
}
has_error = False
# Create a copy of the targets list
targets = list(self.targets)
while len(targets):
recipient = targets.pop(0)
payload = {
'message': {
'token': None,
'notification': {
'title': title,
'body': body,
}
}
}
if recipient[0] == '#':
payload['message']['topic'] = recipient[1:]
self.logger.debug(
"FCM recipient %s parsed as a topic",
recipient[1:])
else:
payload['message']['token'] = recipient
self.logger.debug(
"FCM recipient %s parsed as a device token",
recipient)
self.logger.debug('FCM POST URL: %s (cert_verify=%r)' % (
self.notify_url, self.verify_certificate,
))
self.logger.debug('FCM Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url.format(project=self.project),
data=dumps(payload),
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code not in (
requests.codes.ok, requests.codes.no_content):
# We had a problem
status_str = \
NotifyFCM.http_response_code_lookup(
r.status_code)
self.logger.warning(
'Failed to send FCM notification: '
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n%s', r.content)
has_error = True
else:
self.logger.info('Sent FCM notification.')
except requests.RequestException as e:
self.logger.warning(
'A Connection error occurred sending FCM '
'notification.'
)
self.logger.debug('Socket Exception: %s', str(e))
has_error = True
return not has_error
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Our URL parameters
params = self.url_parameters(privacy=privacy, *args, **kwargs)
return '{schema}://{project}@{apikey}/{targets}?{params}'.format(
schema=self.secure_protocol,
project=NotifyFCM.quote(self.project),
apikey=self.pprint(self.apikey, privacy, safe=''),
targets='/'.join(
[NotifyFCM.quote(x) for x in self.targets]),
params=NotifyFCM.urlencode(params),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# The project identifier associated with the account
results['project'] = NotifyFCM.unquote(results['user'])
# The apikey is stored in the hostname
results['apikey'] = NotifyFCM.unquote(results['host'])
# Get our Device IDs
results['targets'] = NotifyFCM.split_path(results['fullpath'])
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyFCM.parse_list(results['qsd']['to'])
return results

View File

@ -0,0 +1,485 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2021 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# For this plugin to work correct, the FCM server must be set up to allow
# for remote connections.
# Firebase Cloud Messaging
# Visit your console page: https://console.firebase.google.com
# 1. Create a project if you haven't already. If you did the
# {project} ID will be listed as name-XXXXX.
# 2. Click on your project from here to open it up.
# 3. Access your Web API Key by clicking on:
# - The (gear-next-to-project-name) > Project Settings > Cloud Messaging
# Visit the following site to get you're Project information:
# - https://console.cloud.google.com/project/_/settings/general/
#
# Docs: https://firebase.google.com/docs/cloud-messaging/send-message
# Legacy Docs:
# https://firebase.google.com/docs/cloud-messaging/http-server-ref\
# #send-downstream
#
# If you Generate a new private key, it will provide a .json file
# You will need this in order to send an apprise messag
import six
import requests
from json import dumps
from .oauth import GoogleOAuth
from ..NotifyBase import NotifyBase
from ...common import NotifyType
from ...utils import validate_regex
from ...utils import parse_list
from ...AppriseAttachment import AppriseAttachment
from ...AppriseLocale import gettext_lazy as _
# Our lookup map
FCM_HTTP_ERROR_MAP = {
400: 'A bad request was made to the server.',
401: 'The provided API Key was not valid.',
404: 'The token could not be registered.',
}
class FCMMode(object):
"""
Define the Firebase Cloud Messaging Modes
"""
# The legacy way of sending a message
Legacy = "legacy"
# The new API
OAuth2 = "oauth2"
# FCM Modes
FCM_MODES = (
# Legacy API
FCMMode.Legacy,
# HTTP v1 URL
FCMMode.OAuth2,
)
class NotifyFCM(NotifyBase):
"""
A wrapper for Google's Firebase Cloud Messaging Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Firebase Cloud Messaging'
# The services URL
service_url = 'https://firebase.google.com'
# The default protocol
secure_protocol = 'fcm'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_fcm'
# Project Notification
# https://firebase.google.com/docs/cloud-messaging/send-message
notify_oauth2_url = \
"https://fcm.googleapis.com/v1/projects/{project}/messages:send"
notify_legacy_url = "https://fcm.googleapis.com/fcm/send"
# There is no reason we should exceed 5KB when reading in a JSON file.
# If it is more than this, then it is not accepted.
max_fcm_keyfile_size = 5000
# The maximum length of the body
body_maxlen = 1024
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# Define object templates
templates = (
# OAuth2
'{schema}://{project}/{targets}?keyfile={keyfile}',
# Legacy Mode
'{schema}://{apikey}/{targets}',
)
# Define our template
template_tokens = dict(NotifyBase.template_tokens, **{
'apikey': {
'name': _('API Key'),
'type': 'string',
'private': True,
},
'keyfile': {
'name': _('OAuth2 KeyFile'),
'type': 'string',
'private': True,
},
'mode': {
'name': _('Mode'),
'type': 'choice:string',
'values': FCM_MODES,
'default': FCMMode.Legacy,
},
'project': {
'name': _('Project ID'),
'type': 'string',
'required': True,
},
'target_device': {
'name': _('Target Device'),
'type': 'string',
'map_to': 'targets',
},
'target_topic': {
'name': _('Target Topic'),
'type': 'string',
'prefix': '#',
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
})
def __init__(self, project, apikey, targets=None, mode=None, keyfile=None,
**kwargs):
"""
Initialize Firebase Cloud Messaging
"""
super(NotifyFCM, self).__init__(**kwargs)
if mode is None:
# Detect our mode
self.mode = FCMMode.OAuth2 if keyfile else FCMMode.Legacy
else:
# Setup our mode
self.mode = NotifyFCM.template_tokens['mode']['default'] \
if not isinstance(mode, six.string_types) else mode.lower()
if self.mode and self.mode not in FCM_MODES:
msg = 'The mode specified ({}) is invalid.'.format(mode)
self.logger.warning(msg)
raise TypeError(msg)
# Used for Legacy Mode; this is the Web API Key retrieved from the
# User Panel
self.apikey = None
# Path to our Keyfile
self.keyfile = None
# Our Project ID is required to verify against the keyfile
# specified
self.project = None
# Initialize our Google OAuth module we can work with
self.oauth = GoogleOAuth(
user_agent=self.app_id, timeout=self.request_timeout,
verify_certificate=self.verify_certificate)
if self.mode == FCMMode.OAuth2:
# The project ID associated with the account
self.project = validate_regex(project)
if not self.project:
msg = 'An invalid FCM Project ID ' \
'({}) was specified.'.format(project)
self.logger.warning(msg)
raise TypeError(msg)
if not keyfile:
msg = 'No FCM JSON KeyFile was specified.'
self.logger.warning(msg)
raise TypeError(msg)
# Our keyfile object is just an AppriseAttachment object
self.keyfile = AppriseAttachment(asset=self.asset)
# Add our definition to our template
self.keyfile.add(keyfile)
# Enforce maximum file size
self.keyfile[0].max_file_size = self.max_fcm_keyfile_size
else: # Legacy Mode
# The apikey associated with the account
self.apikey = validate_regex(apikey)
if not self.apikey:
msg = 'An invalid FCM API key ' \
'({}) was specified.'.format(apikey)
self.logger.warning(msg)
raise TypeError(msg)
# Acquire Device IDs to notify
self.targets = parse_list(targets)
return
@property
def access_token(self):
"""
Generates a access_token based on the keyfile provided
"""
keyfile = self.keyfile[0]
if not keyfile:
# We could not access the keyfile
self.logger.error(
'Could not access FCM keyfile {}.'.format(
keyfile.url(privacy=True)))
return None
if not self.oauth.load(keyfile.path):
self.logger.error(
'FCM keyfile {} could not be loaded.'.format(
keyfile.url(privacy=True)))
return None
# Verify our project id against the one provided in our keyfile
if self.project != self.oauth.project_id:
self.logger.error(
'FCM keyfile {} identifies itself for a different project'
.format(keyfile.url(privacy=True)))
return None
# Return our generated key; the below returns None if a token could
# not be acquired
return self.oauth.access_token
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform FCM Notification
"""
if not self.targets:
# There is no one to email; we're done
self.logger.warning('There are no FCM devices or topics to notify')
return False
if self.mode == FCMMode.OAuth2:
access_token = self.access_token
if not access_token:
# Error message is generated in access_tokengen() so no reason
# to additionally write anything here
return False
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/json',
"Authorization": "Bearer {}".format(access_token),
}
# Prepare our notify URL
notify_url = self.notify_oauth2_url
else: # FCMMode.Legacy
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/json',
"Authorization": "key={}".format(self.apikey),
}
# Prepare our notify URL
notify_url = self.notify_legacy_url
has_error = False
# Create a copy of the targets list
targets = list(self.targets)
while len(targets):
recipient = targets.pop(0)
if self.mode == FCMMode.OAuth2:
payload = {
'message': {
'token': None,
'notification': {
'title': title,
'body': body,
}
}
}
if recipient[0] == '#':
payload['message']['topic'] = recipient[1:]
self.logger.debug(
"FCM recipient %s parsed as a topic",
recipient[1:])
else:
payload['message']['token'] = recipient
self.logger.debug(
"FCM recipient %s parsed as a device token",
recipient)
else: # FCMMode.Legacy
payload = {
'notification': {
'notification': {
'title': title,
'body': body,
}
}
}
if recipient[0] == '#':
payload['to'] = '/topics/{}'.format(recipient)
self.logger.debug(
"FCM recipient %s parsed as a topic",
recipient[1:])
else:
payload['to'] = recipient
self.logger.debug(
"FCM recipient %s parsed as a device token",
recipient)
self.logger.debug(
'FCM %s POST URL: %s (cert_verify=%r)',
self.mode, notify_url, self.verify_certificate,
)
self.logger.debug('FCM %s Payload: %s', self.mode, str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
notify_url.format(project=self.project),
data=dumps(payload),
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code not in (
requests.codes.ok, requests.codes.no_content):
# We had a problem
status_str = \
NotifyBase.http_response_code_lookup(
r.status_code, FCM_HTTP_ERROR_MAP)
self.logger.warning(
'Failed to send {} FCM notification: '
'{}{}error={}.'.format(
self.mode,
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n%s', r.content)
has_error = True
else:
self.logger.info('Sent %s FCM notification.', self.mode)
except requests.RequestException as e:
self.logger.warning(
'A Connection error occurred sending FCM '
'notification.'
)
self.logger.debug('Socket Exception: %s', str(e))
has_error = True
return not has_error
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any URL parameters
params = {
'mode': self.mode,
}
if self.keyfile:
# Include our keyfile if specified
params['keyfile'] = NotifyFCM.quote(
self.keyfile[0].url(privacy=privacy), safe='')
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
reference = NotifyFCM.quote(self.project) \
if self.mode == FCMMode.OAuth2 \
else self.pprint(self.apikey, privacy, safe='')
return '{schema}://{reference}/{targets}?{params}'.format(
schema=self.secure_protocol,
reference=reference,
targets='/'.join(
[NotifyFCM.quote(x) for x in self.targets]),
params=NotifyFCM.urlencode(params),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# The apikey/project is stored in the hostname
results['apikey'] = NotifyFCM.unquote(results['host'])
results['project'] = results['apikey']
# Get our Device IDs
results['targets'] = NotifyFCM.split_path(results['fullpath'])
# Get our mode
results['mode'] = results['qsd'].get('mode')
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyFCM.parse_list(results['qsd']['to'])
# Our Project ID
if 'project' in results['qsd'] and results['qsd']['project']:
results['project'] = \
NotifyFCM.unquote(results['qsd']['project'])
# Our Web API Key
if 'apikey' in results['qsd'] and results['qsd']['apikey']:
results['apikey'] = \
NotifyFCM.unquote(results['qsd']['apikey'])
# Our Keyfile (JSON)
if 'keyfile' in results['qsd'] and results['qsd']['keyfile']:
results['keyfile'] = \
NotifyFCM.unquote(results['qsd']['keyfile'])
return results

View File

@ -0,0 +1,328 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2021 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# To generate a private key file for your service account:
#
# 1. In the Firebase console, open Settings > Service Accounts.
# 2. Click Generate New Private Key, then confirm by clicking Generate Key.
# 3. Securely store the JSON file containing the key.
import io
import requests
import base64
import json
import calendar
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import asymmetric
from cryptography.exceptions import UnsupportedAlgorithm
from datetime import datetime
from datetime import timedelta
from ...logger import logger
try:
# Python 2.7
from urllib import urlencode as _urlencode
except ImportError:
# Python 3.x
from urllib.parse import urlencode as _urlencode
try:
from json.decoder import JSONDecodeError
except ImportError:
# Python v2.7 Backwards Compatibility support
JSONDecodeError = ValueError
class GoogleOAuth(object):
"""
A OAuth simplified implimentation to Google's Firebase Cloud Messaging
"""
scopes = [
'https://www.googleapis.com/auth/firebase.messaging',
]
# 1 hour in seconds (the lifetime of our token)
access_token_lifetime_sec = timedelta(seconds=3600)
# The default URI to use if one is not found
default_token_uri = 'https://oauth2.googleapis.com/token'
# Taken right from google.auth.helpers:
clock_skew = timedelta(seconds=10)
def __init__(self, user_agent=None, timeout=(5, 4),
verify_certificate=True):
"""
Initialize our OAuth object
"""
# Wether or not to verify ssl
self.verify_certificate = verify_certificate
# Our (connect, read) timeout
self.request_timeout = timeout
# assign our user-agent if defined
self.user_agent = user_agent
# initialize our other object variables
self.__reset()
def __reset(self):
"""
Reset object internal variables
"""
# Google Keyfile Encoding
self.encoding = 'utf-8'
# Our retrieved JSON content (unmangled)
self.content = None
# Our generated key information we cache once loaded
self.private_key = None
# Our keys we build using the provided content
self.__refresh_token = None
self.__access_token = None
self.__access_token_expiry = datetime.utcnow()
def load(self, path):
"""
Generate our SSL details
"""
# Reset our objects
self.content = None
self.private_key = None
self.__access_token = None
self.__access_token_expiry = datetime.utcnow()
try:
with io.open(path, mode="r", encoding=self.encoding) as fp:
self.content = json.loads(fp.read())
except (OSError, IOError):
logger.debug('FCM keyfile {} could not be accessed'.format(path))
return False
except JSONDecodeError as e:
logger.debug(
'FCM keyfile {} generated a JSONDecodeError: {}'.format(
path, e))
return False
if not isinstance(self.content, dict):
logger.debug(
'FCM keyfile {} is incorrectly structured'.format(path))
self.__reset()
return False
# Verify we've got the correct tokens in our content to work with
is_valid = next((False for k in (
'client_email', 'private_key_id', 'private_key',
'type', 'project_id') if not self.content.get(k)), True)
if not is_valid:
logger.debug(
'FCM keyfile {} is missing required information'.format(path))
self.__reset()
return False
# Verify our service_account type
if self.content.get('type') != 'service_account':
logger.debug(
'FCM keyfile {} is not of type service_account'.format(path))
self.__reset()
return False
# Prepare our private key which is in PKCS8 PEM format
try:
self.private_key = serialization.load_pem_private_key(
self.content.get('private_key').encode(self.encoding),
password=None, backend=backends.default_backend())
except (TypeError, ValueError):
# ValueError: If the PEM data could not be decrypted or if its
# structure could not be decoded successfully.
# TypeError: If a password was given and the private key was
# not encrypted. Or if the key was encrypted but
# no password was supplied.
logger.error('FCM provided private key is invalid.')
self.__reset()
return False
except UnsupportedAlgorithm:
# If the serialized key is of a type that is not supported by
# the backend.
logger.error('FCM provided private key is not supported')
self.__reset()
return False
# We've done enough validation to move on
return True
@property
def access_token(self):
"""
Returns our access token (if it hasn't expired yet)
- if we do not have one we'll fetch one.
- if it expired, we'll renew it
- if a key simply can't be acquired, then we return None
"""
if not self.private_key or not self.content:
# invalid content (or not loaded)
logger.error(
'No FCM JSON keyfile content loaded to generate a access '
'token with.')
return None
if self.__access_token_expiry > datetime.utcnow():
# Return our no-expired key
return self.__access_token
# If we reach here we need to prepare our payload
token_uri = self.content.get('token_uri', self.default_token_uri)
service_email = self.content.get('client_email')
key_identifier = self.content.get('private_key_id')
# Generate our Assertion
now = datetime.utcnow()
expiry = now + self.access_token_lifetime_sec
payload = {
# The number of seconds since the UNIX epoch.
"iat": calendar.timegm(now.utctimetuple()),
"exp": calendar.timegm(expiry.utctimetuple()),
# The issuer must be the service account email.
"iss": service_email,
# The audience must be the auth token endpoint's URI
"aud": token_uri,
# Our token scopes
"scope": " ".join(self.scopes),
}
# JWT Details
header = {
'typ': 'JWT',
'alg': 'RS256' if isinstance(
self.private_key, asymmetric.rsa.RSAPrivateKey) else 'ES256',
# Key Identifier
'kid': key_identifier,
}
# Encodes base64 strings removing any padding characters.
segments = [
base64.urlsafe_b64encode(
json.dumps(header).encode(self.encoding)).rstrip(b"="),
base64.urlsafe_b64encode(
json.dumps(payload).encode(self.encoding)).rstrip(b"="),
]
signing_input = b".".join(segments)
signature = self.private_key.sign(
signing_input,
asymmetric.padding.PKCS1v15(),
hashes.SHA256(),
)
# Finally append our segment
segments.append(base64.urlsafe_b64encode(signature).rstrip(b"="))
assertion = b".".join(segments)
http_payload = _urlencode({
'assertion': assertion,
'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
})
http_headers = {
'Content-Type': 'application/x-www-form-urlencoded',
}
if self.user_agent:
http_headers['User-Agent'] = self.user_agent
logger.info('Refreshing FCM Access Token')
try:
r = requests.post(
token_uri,
data=http_payload,
headers=http_headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code != requests.codes.ok:
# We had a problem
logger.warning(
'Failed to update FCM Access Token error={}.'
.format(r.status_code))
logger.debug(
'Response Details:\r\n%s', r.content)
return None
except requests.RequestException as e:
logger.warning(
'A Connection error occurred refreshing FCM '
'Access Token.'
)
logger.debug('Socket Exception: %s', str(e))
return None
# If we get here, we made our request successfully, now we need
# to parse out the data
response = json.loads(r.content)
self.__access_token = response['access_token']
self.__refresh_token = response.get(
'refresh_token', self.__refresh_token)
if 'expires_in' in response:
delta = timedelta(seconds=int(response['expires_in']))
self.__access_token_expiry = \
delta + datetime.utcnow() - self.clock_skew
else:
# Allow some grace before we expire
self.__access_token_expiry = expiry - self.clock_skew
logger.debug(
'Access Token successfully acquired: %s', self.__access_token)
# Return our token
return self.__access_token
@property
def project_id(self):
"""
Returns the project id found in the file
"""
return None if not self.content \
else self.content.get('project_id')

View File

@ -32,24 +32,13 @@ from ..common import NotifyType
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
# Default our global support flag
CRYPTOGRAPHY_AVAILABLE = False
try:
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers import algorithms
from cryptography.hazmat.primitives.ciphers import modes
from cryptography.hazmat.backends import default_backend
from base64 import urlsafe_b64encode
import hashlib
CRYPTOGRAPHY_AVAILABLE = True
except ImportError:
# no problem; this just means the added encryption functionality isn't
# available. You can still send a SimplePush message
pass
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers import algorithms
from cryptography.hazmat.primitives.ciphers import modes
from cryptography.hazmat.backends import default_backend
from base64 import urlsafe_b64encode
import hashlib
class NotifySimplePush(NotifyBase):
@ -181,15 +170,6 @@ class NotifySimplePush(NotifyBase):
Perform SimplePush Notification
"""
# Encrypt Message (providing support is available)
if self.password and self.user and not CRYPTOGRAPHY_AVAILABLE:
# Provide the end user at least some notification that they're
# not getting what they asked for
self.logger.warning(
"Authenticated SimplePush Notifications are not supported by "
"this system; `pip install cryptography`.")
return False
headers = {
'User-Agent': self.app_id,
'Content-type': "application/x-www-form-urlencoded",
@ -200,7 +180,7 @@ class NotifySimplePush(NotifyBase):
'key': self.apikey,
}
if self.password and self.user and CRYPTOGRAPHY_AVAILABLE:
if self.password and self.user:
body = self._encrypt(body)
title = self._encrypt(title)
payload.update({

View File

@ -10,9 +10,6 @@ babel
# Plugin Dependencies
#
# Used for test cases on spush://
cryptography
# Provides xmpp:// support
sleekxmpp

View File

@ -1,3 +1,4 @@
cryptography
requests
requests-oauthlib
six

304
test/test_fcm_plugin.py Normal file
View File

@ -0,0 +1,304 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2021 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import io
import os
import mock
import requests
import json
from apprise import Apprise
from apprise import plugins
from apprise.plugins.NotifyFCM.oauth import GoogleOAuth
from cryptography.exceptions import UnsupportedAlgorithm
try:
from json.decoder import JSONDecodeError
except ImportError:
# Python v2.7 Backwards Compatibility support
JSONDecodeError = ValueError
# Test files for KeyFile Directory
PRIVATE_KEYFILE_DIR = os.path.join(os.path.dirname(__file__), 'var', 'fcm')
@mock.patch('requests.post')
def test_fcm_plugin(mock_post):
"""
API: NotifyFCM() General Checks
"""
# Valid Keyfile
path = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account.json')
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Prepare a good response
response = mock.Mock()
response.content = json.dumps({
"access_token": "ya29.c.abcd",
"expires_in": 3599,
"token_type": "Bearer",
})
response.status_code = requests.codes.ok
mock_post.return_value = response
# Test having a valid keyfile, but not a valid project id match
obj = Apprise.instantiate(
'fcm://invalid_project_id/device/?keyfile={}'.format(str(path)))
# we'll fail as a result
assert obj.notify("test") is False
# Test our call count
assert mock_post.call_count == 0
# Now we test using a valid Project ID but we can't open our file
obj = Apprise.instantiate(
'fcm://mock-project-id/device/?keyfile={}'.format(str(path)))
with mock.patch('io.open', side_effect=OSError):
# we'll fail as a result
assert obj.notify("test") is False
# Test our call count
assert mock_post.call_count == 0
# Now we test using a valid Project ID
obj = Apprise.instantiate(
'fcm://mock-project-id/device/#topic/?keyfile={}'.format(str(path)))
# we'll fail as a result
assert obj.notify("test") is True
# Test our call count
assert mock_post.call_count == 3
assert mock_post.call_args_list[0][0][0] == \
'https://accounts.google.com/o/oauth2/token'
assert mock_post.call_args_list[1][0][0] == \
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send'
assert mock_post.call_args_list[2][0][0] == \
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send'
@mock.patch('requests.post')
def test_fcm_keyfile_parse(mock_post):
"""
API: NotifyFCM() KeyFile Tests
"""
# Prepare a good response
response = mock.Mock()
response.content = json.dumps({
"access_token": "ya29.c.abcd",
"expires_in": 3599,
"token_type": "Bearer",
})
response.status_code = requests.codes.ok
mock_post.return_value = response
path = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account.json')
oauth = GoogleOAuth()
# We can not get an Access Token without content loaded
assert oauth.access_token is None
# Load our content
assert oauth.load(path) is True
assert oauth.access_token is not None
# Test our call count
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://accounts.google.com/o/oauth2/token'
mock_post.reset_mock()
# a second call uses cache since our token hasn't expired yet
assert oauth.access_token is not None
assert mock_post.call_count == 0
# Same test case without expires_in entry
mock_post.reset_mock()
response.content = json.dumps({
"access_token": "ya29.c.abcd",
"token_type": "Bearer",
})
oauth = GoogleOAuth()
assert oauth.load(path) is True
assert oauth.access_token is not None
# Test our call count
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://accounts.google.com/o/oauth2/token'
# Test user-agent override
mock_post.reset_mock()
oauth = GoogleOAuth(user_agent="test-agent-override")
assert oauth.load(path) is True
assert oauth.access_token is not None
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://accounts.google.com/o/oauth2/token'
#
# Test some errors that can get thrown when trying to handle
# the service_account.json file
#
# Reset our object
mock_post.reset_mock()
# Now we test a case where we can't access the file we've been pointed to:
oauth = GoogleOAuth()
with mock.patch('io.open', side_effect=OSError):
# We will fail to retrieve our Access Token
assert oauth.load(path) is False
assert oauth.access_token is None
oauth = GoogleOAuth()
with mock.patch('json.loads', side_effect=([], )):
# We will fail to retrieve our Access Token since we did not parse
# a dictionary
assert oauth.load(path) is False
assert oauth.access_token is None
# Case where we can't load the PEM key:
oauth = GoogleOAuth()
with mock.patch(
'cryptography.hazmat.primitives.serialization'
'.load_pem_private_key',
side_effect=ValueError("")):
assert oauth.load(path) is False
assert oauth.access_token is None
# Case where we can't load the PEM key:
oauth = GoogleOAuth()
with mock.patch(
'cryptography.hazmat.primitives.serialization'
'.load_pem_private_key',
side_effect=TypeError("")):
assert oauth.load(path) is False
assert oauth.access_token is None
# Case where we can't load the PEM key:
oauth = GoogleOAuth()
with mock.patch(
'cryptography.hazmat.primitives.serialization'
'.load_pem_private_key',
side_effect=UnsupportedAlgorithm("")):
# Note: This test should be te
assert oauth.load(path) is False
assert oauth.access_token is None
# Not one call was made to the web
assert mock_post.call_count == 0
#
# Test some web errors that can occur when speaking upstream
# with Google to get our token generated
#
response.status_code = requests.codes.internal_server_error
mock_post.reset_mock()
oauth = GoogleOAuth()
assert oauth.load(path) is True
# We'll fail due to an bad web response
assert oauth.access_token is None
# Return our status code to how it was
response.status_code = requests.codes.ok
# No access token
bad_response_1 = mock.Mock()
bad_response_1.content = json.dumps({
"expires_in": 3599,
"token_type": "Bearer",
})
# Invalid JSON
bad_response_2 = mock.Mock()
bad_response_2.content = '{'
mock_post.return_value = None
# Throw an exception on the first call to requests.post()
for side_effect in (
requests.RequestException(), bad_response_1, bad_response_2):
mock_post.side_effect = side_effect
# Test all of our bad side effects
oauth = GoogleOAuth()
assert oauth.load(path) is True
# We'll fail due to an bad web response
assert oauth.access_token is None
def test_fcm_bad_keyfile_parse():
"""
API: NotifyFCM() KeyFile Bad Service Account Type Tests
"""
path = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account-bad-type.json')
oauth = GoogleOAuth()
assert oauth.load(path) is False
def test_fcm_keyfile_missing_entries_parse(tmpdir):
"""
API: NotifyFCM() KeyFile Missing Entries Test
"""
# Prepare a base keyfile reference to use
path = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account.json')
with io.open(path, mode="r", encoding='utf-8') as fp:
content = json.loads(fp.read())
path = tmpdir.join('fcm_keyfile.json')
# Test that we fail to load if the following keys are missing:
for entry in (
'client_email', 'private_key_id', 'private_key', 'type',
'project_id'):
# Ensure the key actually exists in our file
assert entry in content
# Create a copy of our content
content_copy = content.copy()
# Remove our entry we expect to validate against
del content_copy[entry]
assert entry not in content_copy
path.write(json.dumps(content_copy))
oauth = GoogleOAuth()
assert oauth.load(str(path)) is False
# Now write ourselves a bad JSON file
path.write('{')
oauth = GoogleOAuth()
assert oauth.load(str(path)) is False

View File

@ -604,50 +604,99 @@ TEST_URLS = (
# We failed to identify any valid authentication
'instance': TypeError,
}),
('fcm://apikey/', {
# no project id specified
'instance': TypeError,
}),
('fcm://project@%20%20/', {
# invalid apikey
'instance': TypeError,
}),
('fcm://project@apikey/', {
# No targets specified; we will initialize but not notify anything
('fcm://apikey/', {
# no project id specified so we operate in legacy mode
'instance': plugins.NotifyFCM,
# but there are no targets specified so we return False
'notify_response': False,
}),
('fcm://project@apikey/device', {
('fcm://apikey/device', {
# Valid device
'instance': plugins.NotifyFCM,
'privacy_url': 'fcm://project@a...y/device',
'privacy_url': 'fcm://a...y/device',
}),
('fcm://project@apikey/#topic', {
('fcm://apikey/#topic', {
# Valid topic
'instance': plugins.NotifyFCM,
'privacy_url': 'fcm://project@a...y/%23topic',
'privacy_url': 'fcm://a...y/%23topic',
}),
('fcm://project@apikey/#topic1/device/%20/', {
('fcm://apikey/device?mode=invalid', {
# Valid device, invalid mode
'instance': TypeError,
}),
('fcm://apikey/#topic1/device/%20/', {
# Valid topic, valid device, and invalid entry
'instance': plugins.NotifyFCM,
}),
('fcm://project@apikey?to=#topic1,device', {
('fcm://apikey?to=#topic1,device', {
# Test to=
'instance': plugins.NotifyFCM,
}),
('fcm://project@apikey/#topic1/device/', {
('fcm://?apikey=abc123&to=device', {
# Test apikey= to=
'instance': plugins.NotifyFCM,
}),
('fcm://%20?to=device&keyfile=/invalid/path', {
# invalid Project ID
'instance': TypeError,
}),
('fcm://project_id?to=device&keyfile=/invalid/path', {
# Test to= and auto detection of oauth mode
'instance': plugins.NotifyFCM,
# we'll fail to send our notification as a result
'response': False,
}),
('fcm://?to=device&project=project_id&keyfile=/invalid/path', {
# Test project= & to= and auto detection of oauth mode
'instance': plugins.NotifyFCM,
# we'll fail to send our notification as a result
'response': False,
}),
('fcm://project_id?to=device&mode=oauth2', {
# no keyfile was specified
'instance': TypeError,
}),
('fcm://project_id?to=device&mode=oauth2&keyfile=/invalid/path', {
# Same test as above except we explicitly set our oauth2 mode
# Test to= and auto detection of oauth mode
'instance': plugins.NotifyFCM,
# we'll fail to send our notification as a result
'response': False,
}),
('fcm://apikey/#topic1/device/?mode=legacy', {
'instance': plugins.NotifyFCM,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('fcm://project@apikey/#topic1/device/', {
('fcm://apikey/#topic1/device/?mode=legacy', {
'instance': plugins.NotifyFCM,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
('fcm://project/#topic1/device/?mode=oauth2&keyfile=file://{}'.format(
os.path.join(
os.path.dirname(__file__), 'var', 'fcm',
'service_account.json')), {
'instance': plugins.NotifyFCM,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('fcm://projectid/#topic1/device/?mode=oauth2&keyfile=file://{}'.format(
os.path.join(
os.path.dirname(__file__), 'var', 'fcm',
'service_account.json')), {
'instance': plugins.NotifyFCM,
# Throws a series of connection and transfer exceptions when
# this flag is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
##################################
# NotifyFlock
##################################

View File

@ -1,93 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import os
import sys
import pytest
import apprise
try:
# Python v3.4+
from importlib import reload
except ImportError:
try:
# Python v3.0-v3.3
from imp import reload
except ImportError:
# Python v2.7
pass
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="requires cryptography")
def test_simplepush_plugin(tmpdir):
"""
API: NotifySimplePush Plugin()
"""
suite = tmpdir.mkdir("simplepush")
suite.join("__init__.py").write('')
module_name = 'cryptography'
suite.join("{}.py".format(module_name)).write('raise ImportError()')
# Update our path to point to our new test suite
sys.path.insert(0, str(suite))
for name in list(sys.modules.keys()):
if name.startswith('{}.'.format(module_name)):
del sys.modules[name]
del sys.modules[module_name]
# The following libraries need to be reloaded to prevent
# TypeError: super(type, obj): obj must be an instance or subtype of type
# This is better explained in this StackOverflow post:
# https://stackoverflow.com/questions/31363311/\
# any-way-to-manually-fix-operation-of-\
# super-after-ipython-reload-avoiding-ty
#
reload(sys.modules['apprise.plugins.NotifySimplePush'])
reload(sys.modules['apprise.plugins'])
reload(sys.modules['apprise.Apprise'])
reload(sys.modules['apprise'])
# Without the cryptography library objects can still be instantiated
# however notifications will fail
obj = apprise.Apprise.instantiate('spush://salt:pass@valid_api_key')
assert obj is not None
# We can't notify with a user/pass combo and no cryptography library
assert obj.notify(
title="test message title", body="message body") is False
# Tidy-up / restore things to how they were
os.unlink(str(suite.join("{}.py".format(module_name))))
reload(sys.modules['apprise.plugins.NotifySimplePush'])
reload(sys.modules['apprise.plugins'])
reload(sys.modules['apprise.Apprise'])
reload(sys.modules['apprise'])

View File

@ -0,0 +1,12 @@
{
"type": "bad_type",
"project_id": "mock-project-id",
"private_key_id": "mock-key-id-1",
"private_key": "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEAwJENcRev+eXZKvhhWLiV3Lz2MvO+naQRHo59g3vaNQnbgyduN/L4krlr\nJ5c6FiikXdtJNb/QrsAHSyJWCu8j3T9CruiwbidGAk2W0RuViTVspjHUTsIHExx9euWM0Uom\nGvYkoqXahdhPL/zViVSJt+Rt8bHLsMvpb8RquTIb9iKY3SMV2tCofNmyCSgVbghq/y7lKORt\nV/IRguWs6R22fbkb0r2MCYoNAbZ9dqnbRIFNZBC7itYtUoTEresRWcyFMh0zfAIJycWOJlVL\nDLqkY2SmIx8u7fuysCg1wcoSZoStuDq02nZEMw1dx8HGzE0hynpHlloRLByuIuOAfMCCYwID\nAQABAoIBADFtihu7TspAO0wSUTpqttzgC/nsIsNn95T2UjVLtyjiDNxPZLUrwq42tdCFur0x\nVW9Z+CK5x6DzXWvltlw8IeKKeF1ZEOBVaFzy+YFXKTz835SROcO1fgdjyrme7lRSShGlmKW/\nGKY+baUNquoDLw5qreXaE0SgMp0jt5ktyYuVxvhLDeV4omw2u6waoGkifsGm8lYivg5l3VR7\nw2IVOvYZTt4BuSYVwOM+qjwaS1vtL7gv0SUjrj85Ja6zERRdFiITDhZw6nsvacr9/+/aut9E\naL/koSSb62g5fntQMEwoT4hRnjPnAedmorM9Rhddh2TB3ZKTBbMN1tUk3fJxOuECgYEA+z6l\neSaAcZ3qvwpntcXSpwwJ0SSmzLTH2RJNf+Ld3eBHiSvLTG53dWB7lJtF4R1KcIwf+KGcOFJv\nsnepzcZBylRvT8RrAAkV0s9OiVm1lXZyaepbLg4GGFJBPi8A6VIAj7zYknToRApdW0s1x/XX\nChewfJDckqsevTMovdbg8YkCgYEAxDYX+3mfvv/opo6HNNY3SfVunM+4vVJL+n8gWZ2w9kz3\nQ9Ub9YbRmI7iQaiVkO5xNuoG1n9bM+3Mnm84aQ1YeNT01YqeyQsipP5Wi+um0PzYTaBw9RO+\n8Gh6992OwlJiRtFk5WjalNWOxY4MU0ImnJwIfKQlUODvLmcixm68NYsCgYEAuAqI3jkk55Vd\nKvotREsX5wP7gPePM+7NYiZ1HNQL4Ab1f/bTojZdTV8Sx6YCR0fUiqMqnE+OBvfkGGBtw22S\nLesx6sWf99Ov58+x4Q0U5dpxL0Lb7d2Z+2Dtp+Z4jXFjNeeI4ae/qG/LOR/b0pE0J5F415ap\n7Mpq5v89vepUtrkCgYAjMXytu4v+q1Ikhc4UmRPDrUUQ1WVSd+9u19yKlnFGTFnRjej86hiw\nH3jPxBhHra0a53EgiilmsBGSnWpl1WH4EmJz5vBCKUAmjgQiBrueIqv9iHiaTNdjsanUyaWw\njyxXfXl2eI80QPXh02+8g1H/pzESgjK7Rg1AqnkfVH9nrwKBgQDJVxKBPTw9pigYMVt9iHrR\niCl9zQVjRMbWiPOc0J56+/5FZYm/AOGl9rfhQ9vGxXZYZiOP5FsNkwt05Y1UoAAH4B4VQwbL\nqod71qOcI0ywgZiIR87CYw40gzRfjWnN+YEEW1qfyoNLilEwJB8iB/T+ZePHGmJ4MmQ/cTn9\nxpdLXA==\n-----END RSA PRIVATE KEY-----",
"client_email": "mock-email@mock-project.iam.gserviceaccount.com",
"client_id": "1234567890",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/mock-project-id.iam.gserviceaccount.com"
}

View File

@ -0,0 +1,12 @@
{
"type": "service_account",
"project_id": "mock-project-id",
"private_key_id": "mock-key-id-1",
"private_key": "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEAwJENcRev+eXZKvhhWLiV3Lz2MvO+naQRHo59g3vaNQnbgyduN/L4krlr\nJ5c6FiikXdtJNb/QrsAHSyJWCu8j3T9CruiwbidGAk2W0RuViTVspjHUTsIHExx9euWM0Uom\nGvYkoqXahdhPL/zViVSJt+Rt8bHLsMvpb8RquTIb9iKY3SMV2tCofNmyCSgVbghq/y7lKORt\nV/IRguWs6R22fbkb0r2MCYoNAbZ9dqnbRIFNZBC7itYtUoTEresRWcyFMh0zfAIJycWOJlVL\nDLqkY2SmIx8u7fuysCg1wcoSZoStuDq02nZEMw1dx8HGzE0hynpHlloRLByuIuOAfMCCYwID\nAQABAoIBADFtihu7TspAO0wSUTpqttzgC/nsIsNn95T2UjVLtyjiDNxPZLUrwq42tdCFur0x\nVW9Z+CK5x6DzXWvltlw8IeKKeF1ZEOBVaFzy+YFXKTz835SROcO1fgdjyrme7lRSShGlmKW/\nGKY+baUNquoDLw5qreXaE0SgMp0jt5ktyYuVxvhLDeV4omw2u6waoGkifsGm8lYivg5l3VR7\nw2IVOvYZTt4BuSYVwOM+qjwaS1vtL7gv0SUjrj85Ja6zERRdFiITDhZw6nsvacr9/+/aut9E\naL/koSSb62g5fntQMEwoT4hRnjPnAedmorM9Rhddh2TB3ZKTBbMN1tUk3fJxOuECgYEA+z6l\neSaAcZ3qvwpntcXSpwwJ0SSmzLTH2RJNf+Ld3eBHiSvLTG53dWB7lJtF4R1KcIwf+KGcOFJv\nsnepzcZBylRvT8RrAAkV0s9OiVm1lXZyaepbLg4GGFJBPi8A6VIAj7zYknToRApdW0s1x/XX\nChewfJDckqsevTMovdbg8YkCgYEAxDYX+3mfvv/opo6HNNY3SfVunM+4vVJL+n8gWZ2w9kz3\nQ9Ub9YbRmI7iQaiVkO5xNuoG1n9bM+3Mnm84aQ1YeNT01YqeyQsipP5Wi+um0PzYTaBw9RO+\n8Gh6992OwlJiRtFk5WjalNWOxY4MU0ImnJwIfKQlUODvLmcixm68NYsCgYEAuAqI3jkk55Vd\nKvotREsX5wP7gPePM+7NYiZ1HNQL4Ab1f/bTojZdTV8Sx6YCR0fUiqMqnE+OBvfkGGBtw22S\nLesx6sWf99Ov58+x4Q0U5dpxL0Lb7d2Z+2Dtp+Z4jXFjNeeI4ae/qG/LOR/b0pE0J5F415ap\n7Mpq5v89vepUtrkCgYAjMXytu4v+q1Ikhc4UmRPDrUUQ1WVSd+9u19yKlnFGTFnRjej86hiw\nH3jPxBhHra0a53EgiilmsBGSnWpl1WH4EmJz5vBCKUAmjgQiBrueIqv9iHiaTNdjsanUyaWw\njyxXfXl2eI80QPXh02+8g1H/pzESgjK7Rg1AqnkfVH9nrwKBgQDJVxKBPTw9pigYMVt9iHrR\niCl9zQVjRMbWiPOc0J56+/5FZYm/AOGl9rfhQ9vGxXZYZiOP5FsNkwt05Y1UoAAH4B4VQwbL\nqod71qOcI0ywgZiIR87CYw40gzRfjWnN+YEEW1qfyoNLilEwJB8iB/T+ZePHGmJ4MmQ/cTn9\nxpdLXA==\n-----END RSA PRIVATE KEY-----",
"client_email": "mock-email@mock-project.iam.gserviceaccount.com",
"client_id": "1234567890",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/mock-project-id.iam.gserviceaccount.com"
}