mirror of https://github.com/caronc/apprise
Added SendPulse Support
parent
e3e34c4211
commit
e5e7ab0784
|
@ -0,0 +1,624 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# BSD 2-Clause License
|
||||
#
|
||||
# Apprise - Push Notification Library.
|
||||
# Copyright (c) 2024, Chris Caron <lead2gold@gmail.com>
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions are met:
|
||||
#
|
||||
# 1. Redistributions of source code must retain the above copyright notice,
|
||||
# this list of conditions and the following disclaimer.
|
||||
#
|
||||
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
# this list of conditions and the following disclaimer in the documentation
|
||||
# and/or other materials provided with the distribution.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
||||
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
# POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
# Simple API Reference:
|
||||
# - https://sendpulse.com/integrations/api/smtp
|
||||
|
||||
import requests
|
||||
from json import dumps
|
||||
import base64
|
||||
|
||||
from .base import NotifyBase
|
||||
from .. import exception
|
||||
from ..common import NotifyFormat
|
||||
from ..common import NotifyType
|
||||
from ..utils import parse_list
|
||||
from ..utils import is_email
|
||||
from ..utils import parse_emails
|
||||
from ..conversion import convert_between
|
||||
from ..utils import validate_regex
|
||||
from ..locale import gettext_lazy as _
|
||||
from email.utils import formataddr
|
||||
|
||||
|
||||
class NotifySendPulse(NotifyBase):
|
||||
"""
|
||||
A wrapper for Notify SendPulse Notifications
|
||||
"""
|
||||
|
||||
# The default descriptive name associated with the Notification
|
||||
service_name = 'SendPulse'
|
||||
|
||||
# The services URL
|
||||
service_url = 'https://sendpulse.com'
|
||||
|
||||
# The default secure protocol
|
||||
secure_protocol = 'sendpulse'
|
||||
|
||||
# A URL that takes you to the setup/help of the specific protocol
|
||||
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_sendpulse'
|
||||
|
||||
# Default to markdown
|
||||
notify_format = NotifyFormat.HTML
|
||||
|
||||
# The default Email API URL to use
|
||||
notify_email_url = 'https://api.sendpulse.com/smtp/emails'
|
||||
|
||||
# Support attachments
|
||||
attachment_support = True
|
||||
|
||||
# Allow 300 requests per minute.
|
||||
# 60/300 = 0.2
|
||||
request_rate_per_sec = 0.2
|
||||
|
||||
# Support attachments
|
||||
attachment_support = True
|
||||
|
||||
# The default subject to use if one isn't specified.
|
||||
default_empty_subject = '<no subject>'
|
||||
|
||||
# Define object templates
|
||||
templates = (
|
||||
'{schema}://{from_email}/{client_id}/{client_secret}/',
|
||||
'{schema}://{from_email}/{client_id}/{client_secret}/{targets}',
|
||||
)
|
||||
|
||||
# Define our template arguments
|
||||
template_tokens = dict(NotifyBase.template_tokens, **{
|
||||
'from_email': {
|
||||
'name': _('Source Email'),
|
||||
'type': 'string',
|
||||
'required': True,
|
||||
},
|
||||
'client_id': {
|
||||
'name': _('Client ID'),
|
||||
'type': 'string',
|
||||
'required': True,
|
||||
'private': True,
|
||||
'regex': (r'^[A-Z0-9._-]+$', 'i'),
|
||||
},
|
||||
'client_secret': {
|
||||
'name': _('Client Secret'),
|
||||
'type': 'string',
|
||||
'required': True,
|
||||
'private': True,
|
||||
'regex': (r'^[A-Z0-9._-]+$', 'i'),
|
||||
},
|
||||
'target_email': {
|
||||
'name': _('Target Email'),
|
||||
'type': 'string',
|
||||
'map_to': 'targets',
|
||||
},
|
||||
'targets': {
|
||||
'name': _('Targets'),
|
||||
'type': 'list:string',
|
||||
},
|
||||
})
|
||||
|
||||
# Define our template arguments
|
||||
template_args = dict(NotifyBase.template_args, **{
|
||||
'to': {
|
||||
'alias_of': 'targets',
|
||||
},
|
||||
'from': {
|
||||
'name': _('From Email'),
|
||||
'type': 'string',
|
||||
'map_to': 'from_email',
|
||||
},
|
||||
'cc': {
|
||||
'name': _('Carbon Copy'),
|
||||
'type': 'list:string',
|
||||
},
|
||||
'bcc': {
|
||||
'name': _('Blind Carbon Copy'),
|
||||
'type': 'list:string',
|
||||
},
|
||||
'template': {
|
||||
# The template ID is an integer
|
||||
'name': _('Template ID'),
|
||||
'type': 'int',
|
||||
},
|
||||
'id': {
|
||||
'alias_of': 'client_id',
|
||||
},
|
||||
'secret': {
|
||||
'alias_of': 'client_secret',
|
||||
}
|
||||
})
|
||||
|
||||
# Support Template Dynamic Variables (Substitutions)
|
||||
template_kwargs = {
|
||||
'template_data': {
|
||||
'name': _('Template Data'),
|
||||
'prefix': '+',
|
||||
},
|
||||
}
|
||||
|
||||
def __init__(self, from_email, client_id, client_secret, targets=None,
|
||||
cc=None, bcc=None, template=None,
|
||||
template_data=None, **kwargs):
|
||||
"""
|
||||
Initialize Notify SendPulse Object
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
|
||||
# Client ID
|
||||
self.client_id = validate_regex(
|
||||
client_id, *self.template_tokens['client_id']['regex'])
|
||||
if not self.client_id:
|
||||
msg = 'An invalid SendPulse Client ID ' \
|
||||
'({}) was specified.'.format(client_id)
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
# Client Secret
|
||||
self.client_secret = validate_regex(
|
||||
client_secret, *self.template_tokens['client_secret']['regex'])
|
||||
if not self.client_secret:
|
||||
msg = 'An invalid SendPulse Client Secret ' \
|
||||
'({}) was specified.'.format(client_secret)
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
result = is_email(from_email)
|
||||
if not result:
|
||||
msg = 'Invalid ~From~ email specified: {}'.format(from_email)
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
# Tracks emails to name lookups (if they exist)
|
||||
self.__email_map = {}
|
||||
|
||||
# Store from email address
|
||||
self.from_email = result['full_email']
|
||||
self.__email_map[self.from_email] = result['name'] \
|
||||
if result['name'] else self.app_id
|
||||
|
||||
# Acquire Targets (To Emails)
|
||||
self.targets = list()
|
||||
|
||||
# Acquire Carbon Copies
|
||||
self.cc = set()
|
||||
|
||||
# Acquire Blind Carbon Copies
|
||||
self.bcc = set()
|
||||
|
||||
if template:
|
||||
try:
|
||||
# Store our template
|
||||
self.template = int(template)
|
||||
|
||||
except (TypeError, ValueError):
|
||||
# Not a valid integer; ignore entry
|
||||
err = 'The SendPulse Template ID specified ({}) is invalid.'\
|
||||
.format(template)
|
||||
self.logger.warning(err)
|
||||
raise TypeError(err)
|
||||
|
||||
# Now our dynamic template data (if defined)
|
||||
self.template_data = template_data \
|
||||
if isinstance(template_data, dict) else {}
|
||||
|
||||
if targets:
|
||||
# Validate recipients (to:) and drop bad ones:
|
||||
for recipient in parse_emails(targets):
|
||||
result = is_email(recipient)
|
||||
if result:
|
||||
self.targets.append(
|
||||
(result['name'] if result['name'] else False,
|
||||
result['full_email']))
|
||||
continue
|
||||
|
||||
self.logger.warning(
|
||||
'Dropped invalid To email '
|
||||
'({}) specified.'.format(recipient),
|
||||
)
|
||||
|
||||
else:
|
||||
# If our target email list is empty we want to add ourselves to it
|
||||
self.targets.append(self.from_email)
|
||||
|
||||
# Validate recipients (cc:) and drop bad ones:
|
||||
for recipient in parse_list(cc):
|
||||
|
||||
result = is_email(recipient)
|
||||
if result:
|
||||
self.cc.add(result['full_email'])
|
||||
if result['name']:
|
||||
self.__email_lookup[result['full_email']] = result['name']
|
||||
continue
|
||||
|
||||
self.logger.warning(
|
||||
'Dropped invalid Carbon Copy email '
|
||||
'({}) specified.'.format(recipient),
|
||||
)
|
||||
|
||||
# Validate recipients (bcc:) and drop bad ones:
|
||||
for recipient in parse_list(bcc):
|
||||
|
||||
result = is_email(recipient)
|
||||
if result:
|
||||
self.bcc.add(result['full_email'])
|
||||
if result['name']:
|
||||
self.__email_lookup[result['full_email']] = result['name']
|
||||
continue
|
||||
|
||||
self.logger.warning(
|
||||
'Dropped invalid Blind Carbon Copy email '
|
||||
'({}) specified.'.format(recipient),
|
||||
)
|
||||
|
||||
if len(self.targets) == 0:
|
||||
# Notify ourselves
|
||||
self.targets.append(self.from_email)
|
||||
|
||||
return
|
||||
|
||||
@property
|
||||
def url_identifier(self):
|
||||
"""
|
||||
Returns all of the identifiers that make this URL unique from
|
||||
another simliar one. Targets or end points should never be identified
|
||||
here.
|
||||
"""
|
||||
return (self.secure_protocol, self.client_id, self.client_secret)
|
||||
|
||||
def url(self, privacy=False, *args, **kwargs):
|
||||
"""
|
||||
Returns the URL built dynamically based on specified arguments.
|
||||
"""
|
||||
|
||||
# Our URL parameters
|
||||
params = self.url_parameters(privacy=privacy, *args, **kwargs)
|
||||
|
||||
if len(self.cc) > 0:
|
||||
# Handle our Carbon Copy Addresses
|
||||
params['cc'] = ','.join([
|
||||
formataddr(
|
||||
(self.__email_lookup[e]
|
||||
if e in self.__email_lookup else False, e),
|
||||
# Swap comma for it's escaped url code (if detected) since
|
||||
# we're using that as a delimiter
|
||||
charset='utf-8').replace(',', '%2C')
|
||||
for e in self.cc])
|
||||
|
||||
if len(self.bcc) > 0:
|
||||
# Handle our Blind Carbon Copy Addresses
|
||||
params['bcc'] = ','.join([
|
||||
formataddr(
|
||||
(self.__email_lookup[e]
|
||||
if e in self.__email_lookup else False, e),
|
||||
# Swap comma for it's escaped url code (if detected) since
|
||||
# we're using that as a delimiter
|
||||
charset='utf-8').replace(',', '%2C')
|
||||
for e in self.bcc])
|
||||
|
||||
if self.template:
|
||||
# Handle our Template ID if if was specified
|
||||
params['template'] = self.template
|
||||
|
||||
# Append our template_data into our parameter list
|
||||
params.update(
|
||||
{'+{}'.format(k): v for k, v in self.template_data.items()})
|
||||
|
||||
# a simple boolean check as to whether we display our target emails
|
||||
# or not
|
||||
has_targets = \
|
||||
not (len(self.targets) == 1 and self.targets[0] == self.from_email)
|
||||
|
||||
return '{schema}://{source}/{cid}/{secret}/{targets}?{params}'.format(
|
||||
schema=self.secure_protocol,
|
||||
source=self.from_email,
|
||||
cid=self.pprint(self.client_id, privacy, safe=''),
|
||||
secret=self.pprint(self.client_secret, privacy, safe=''),
|
||||
targets='' if not has_targets else '/'.join(
|
||||
[NotifySendPulse.quote(x, safe='') for x in self.targets]),
|
||||
params=NotifySendPulse.urlencode(params),
|
||||
)
|
||||
|
||||
def __len__(self):
|
||||
"""
|
||||
Returns the number of targets associated with this notification
|
||||
"""
|
||||
return len(self.targets)
|
||||
|
||||
def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
|
||||
**kwargs):
|
||||
"""
|
||||
Perform SendPulse Notification
|
||||
"""
|
||||
|
||||
headers = {
|
||||
'User-Agent': self.app_id,
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': 'Bearer {}'.format(self.apikey),
|
||||
}
|
||||
|
||||
# error tracking (used for function return)
|
||||
has_error = False
|
||||
|
||||
# A Simple Email Payload Template
|
||||
_payload = {
|
||||
'email': {
|
||||
'from': {
|
||||
'name': self.from_email[0],
|
||||
'email': self.from_email[1],
|
||||
},
|
||||
# To is populated further on
|
||||
'to': [],
|
||||
# A subject is a requirement, so if none is specified we must
|
||||
# set a default with at least 1 character or SendPulse will
|
||||
# deny our request
|
||||
'subject': title if title else self.default_empty_subject,
|
||||
}
|
||||
}
|
||||
|
||||
# Prepare Email Message
|
||||
if self.notify_format == NotifyFormat.HTML:
|
||||
# HTML
|
||||
_payload['email'].update({
|
||||
'text': convert_between(
|
||||
NotifyFormat.HTML, NotifyFormat.TEXT, body),
|
||||
'html': base64.b64encode(body.encode('utf-8')).decode('ascii'),
|
||||
})
|
||||
|
||||
else: # Text
|
||||
_payload['email']['text'] = body
|
||||
|
||||
if attach and self.attachment_support:
|
||||
attachments = {}
|
||||
|
||||
# Send our attachments
|
||||
for no, attachment in enumerate(attach, start=1):
|
||||
# Perform some simple error checking
|
||||
if not attachment:
|
||||
# We could not access the attachment
|
||||
self.logger.error(
|
||||
'Could not access SendPulse attachment {}.'.format(
|
||||
attachment.url(privacy=True)))
|
||||
return False
|
||||
|
||||
try:
|
||||
attachments[
|
||||
attachment.name if attachment.name
|
||||
else f'file{no:03}.dat'] = attachment.base64()
|
||||
|
||||
except exception.AppriseException:
|
||||
# We could not access the attachment
|
||||
self.logger.error(
|
||||
'Could not access SendPulse attachment {}.'.format(
|
||||
attachment.url(privacy=True)))
|
||||
return False
|
||||
|
||||
self.logger.debug(
|
||||
'Appending SendPulse attachment {}'.format(
|
||||
attachment.url(privacy=True)))
|
||||
|
||||
# Append our attachments to the payload
|
||||
_payload['email'].update({
|
||||
'attachments_binary': attachments,
|
||||
})
|
||||
|
||||
if self.template:
|
||||
_payload['email'].update({
|
||||
'template': {
|
||||
'id': self.template,
|
||||
'variables': self.template_data,
|
||||
}})
|
||||
|
||||
targets = list(self.targets)
|
||||
while len(targets) > 0:
|
||||
target = targets.pop(0)
|
||||
|
||||
# Create a copy of our template
|
||||
payload = _payload.copy()
|
||||
|
||||
# the cc, bcc, to field must be unique or SendMail will fail, the
|
||||
# below code prepares this by ensuring the target isn't in the cc
|
||||
# list or bcc list. It also makes sure the cc list does not contain
|
||||
# any of the bcc entries
|
||||
cc = (self.cc - self.bcc - set([target]))
|
||||
bcc = (self.bcc - set([target]))
|
||||
|
||||
#
|
||||
# prepare our 'to'
|
||||
#
|
||||
to = {
|
||||
'email': target
|
||||
}
|
||||
if target in self.__email_lookup:
|
||||
to['name'] = self.__email_lookup[target]
|
||||
|
||||
# Set our target
|
||||
payload['email']['to'] = [to]
|
||||
|
||||
if len(cc):
|
||||
payload['email']['cc'] = []
|
||||
for email in cc:
|
||||
item = {
|
||||
'email': email,
|
||||
}
|
||||
if email in self.__email_lookup:
|
||||
item['name'] = self.__email_lookup[email]
|
||||
|
||||
payload['email']['cc'].append(item)
|
||||
|
||||
if len(bcc):
|
||||
payload['email']['bcc'] = []
|
||||
for email in bcc:
|
||||
item = {
|
||||
'email': email,
|
||||
}
|
||||
if email in self.__email_lookup:
|
||||
item['name'] = self.__email_lookup[email]
|
||||
|
||||
payload['email']['bcc'].append(item)
|
||||
|
||||
self.logger.debug('SendPulse POST URL: %s (cert_verify=%r)' % (
|
||||
self.notify_email_url, self.verify_certificate,
|
||||
))
|
||||
self.logger.debug('SendPulse Payload: %s' % str(payload))
|
||||
|
||||
# Always call throttle before any remote server i/o is made
|
||||
self.throttle()
|
||||
try:
|
||||
r = requests.post(
|
||||
self.notify_email_url,
|
||||
data=dumps(payload),
|
||||
headers=headers,
|
||||
verify=self.verify_certificate,
|
||||
timeout=self.request_timeout,
|
||||
)
|
||||
if r.status_code not in (
|
||||
requests.codes.ok, requests.codes.accepted):
|
||||
# We had a problem
|
||||
status_str = \
|
||||
NotifySendPulse.http_response_code_lookup(
|
||||
r.status_code)
|
||||
|
||||
self.logger.warning(
|
||||
'Failed to send SendPulse notification to {}: '
|
||||
'{}{}error={}.'.format(
|
||||
target,
|
||||
status_str,
|
||||
', ' if status_str else '',
|
||||
r.status_code))
|
||||
|
||||
self.logger.debug(
|
||||
'Response Details:\r\n{}'.format(r.content))
|
||||
|
||||
# Mark our failure
|
||||
has_error = True
|
||||
continue
|
||||
|
||||
else:
|
||||
self.logger.info(
|
||||
'Sent SendPulse notification to {}.'.format(target))
|
||||
|
||||
except requests.RequestException as e:
|
||||
self.logger.warning(
|
||||
'A Connection error occurred sending SendPulse '
|
||||
'notification to {}.'.format(target))
|
||||
self.logger.debug('Socket Exception: %s' % str(e))
|
||||
|
||||
# Mark our failure
|
||||
has_error = True
|
||||
continue
|
||||
|
||||
return not has_error
|
||||
|
||||
@staticmethod
|
||||
def parse_url(url):
|
||||
"""
|
||||
Parses the URL and returns enough arguments that can allow
|
||||
us to re-instantiate this object.
|
||||
|
||||
"""
|
||||
|
||||
results = NotifyBase.parse_url(url)
|
||||
if not results:
|
||||
# We're done early as we couldn't load the results
|
||||
return results
|
||||
|
||||
# Our URL looks like this:
|
||||
# {schema}://{from_email}:{client_id}/{client_secret}/{targets}
|
||||
#
|
||||
# which actually equates to:
|
||||
# {schema}://{user}@{host}/{client_id}/{client_secret}
|
||||
# /{email1}/{email2}/etc..
|
||||
# ^ ^
|
||||
# | |
|
||||
# -from addr-
|
||||
|
||||
# Get our potential email targets
|
||||
# First 2 elements are the client_id and client_secret
|
||||
results['targets'] = NotifySendPulse.split_path(results['fullpath'])
|
||||
# check for our client id
|
||||
if 'id' in results['qsd'] and len(results['qsd']['id']):
|
||||
# Store our Client ID
|
||||
results['client_id'] = \
|
||||
NotifySendPulse.unquote(results['qsd']['id'])
|
||||
|
||||
else:
|
||||
# Store our Client ID
|
||||
results['client_id'] = results['targets'].pop(0)
|
||||
|
||||
if 'secret' in results['qsd'] and len(results['qsd']['secret']):
|
||||
# Store our Client Secret
|
||||
results['client_secret'] = \
|
||||
NotifySendPulse.unquote(results['qsd']['secret'])
|
||||
|
||||
else:
|
||||
# Store our Client Secret
|
||||
results['client_secret'] = results['targets'].pop(0)
|
||||
|
||||
if 'from' in results['qsd'] and len(results['qsd']['from']):
|
||||
results['from_email'] = \
|
||||
NotifySendPulse.unquote(results['qsd']['from_email'])
|
||||
|
||||
# This means any user@host is the To Address if defined
|
||||
if results.get('user') and results.get('host'):
|
||||
results['targets'] += '{}@{}'.format(
|
||||
NotifySendPulse.unquote(
|
||||
results['password']
|
||||
if results['password'] else results['user']),
|
||||
NotifySendPulse.unquote(results['host']),
|
||||
)
|
||||
|
||||
elif results.get('user') and results.get('host'):
|
||||
results['from_email'] = '{}@{}'.format(
|
||||
NotifySendPulse.unquote(
|
||||
results['password']
|
||||
if results['password'] else results['user']),
|
||||
NotifySendPulse.unquote(results['host']),
|
||||
)
|
||||
|
||||
# The 'to' makes it easier to use yaml configuration
|
||||
if 'to' in results['qsd'] and len(results['qsd']['to']):
|
||||
results['targets'] += \
|
||||
NotifySendPulse.parse_list(results['qsd']['to'])
|
||||
|
||||
# Handle Carbon Copy Addresses
|
||||
if 'cc' in results['qsd'] and len(results['qsd']['cc']):
|
||||
results['cc'] = \
|
||||
NotifySendPulse.parse_list(results['qsd']['cc'])
|
||||
|
||||
# Handle Blind Carbon Copy Addresses
|
||||
if 'bcc' in results['qsd'] and len(results['qsd']['bcc']):
|
||||
results['bcc'] = \
|
||||
NotifySendPulse.parse_list(results['qsd']['bcc'])
|
||||
|
||||
# Handle Blind Carbon Copy Addresses
|
||||
if 'template' in results['qsd'] and len(results['qsd']['template']):
|
||||
results['template'] = \
|
||||
NotifySendPulse.unquote(results['qsd']['template'])
|
||||
|
||||
# Add any template substitutions
|
||||
results['template_data'] = results['qsd+']
|
||||
|
||||
return results
|
|
@ -0,0 +1,204 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# BSD 2-Clause License
|
||||
#
|
||||
# Apprise - Push Notification Library.
|
||||
# Copyright (c) 2024, Chris Caron <lead2gold@gmail.com>
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions are met:
|
||||
#
|
||||
# 1. Redistributions of source code must retain the above copyright notice,
|
||||
# this list of conditions and the following disclaimer.
|
||||
#
|
||||
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
# this list of conditions and the following disclaimer in the documentation
|
||||
# and/or other materials provided with the distribution.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
||||
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
# POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
from unittest import mock
|
||||
|
||||
import os
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from apprise import Apprise
|
||||
from apprise import NotifyType
|
||||
from apprise import AppriseAttachment
|
||||
from apprise.plugins.sendpulse import NotifySendPulse
|
||||
from helpers import AppriseURLTester
|
||||
|
||||
# Disable logging for a cleaner testing output
|
||||
import logging
|
||||
logging.disable(logging.CRITICAL)
|
||||
|
||||
# Attachment Directory
|
||||
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
|
||||
|
||||
# Our Testing URLs
|
||||
apprise_url_tests = (
|
||||
('sendpulse://', {
|
||||
'instance': None,
|
||||
}),
|
||||
('sendpulse://:@/', {
|
||||
'instance': None,
|
||||
}),
|
||||
('sendpulse://abcd', {
|
||||
# Just an broken email (no email, client_id or secret)
|
||||
'instance': None,
|
||||
}),
|
||||
('sendpulse://abcd@host', {
|
||||
# Just an Email specified, no client_id or client_secret
|
||||
'instance': None,
|
||||
}),
|
||||
('sendpulse://user@example.com/client_id/client_secret/', {
|
||||
'instance': NotifySendPulse,
|
||||
}),
|
||||
('sendpulse://user@example.com/client_id/client_secret/'
|
||||
'?bcc=l2g@nuxref.com', {
|
||||
# A good email with Blind Carbon Copy
|
||||
'instance': NotifySendPulse,
|
||||
}),
|
||||
('sendpulse://user@example.com/client_id/client_secret/'
|
||||
'?cc=l2g@nuxref.com', {
|
||||
# A good email with Carbon Copy
|
||||
'instance': NotifySendPulse,
|
||||
}),
|
||||
('sendpulse://user@example.com/client_id/client_secret/'
|
||||
'?to=l2g@nuxref.com', {
|
||||
# A good email with Carbon Copy
|
||||
'instance': NotifySendPulse,
|
||||
}),
|
||||
('sendpulse://user@example.com/client_id/client_secret/'
|
||||
'?template=1234', {
|
||||
# A good email with a template + no substitutions
|
||||
'instance': NotifySendPulse,
|
||||
}),
|
||||
('sendpulse://user@example.com/client_id/client_secret/'
|
||||
'?template=1234&+sub=value&+sub2=value2', {
|
||||
# A good email with a template + substitutions
|
||||
'instance': NotifySendPulse,
|
||||
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
'privacy_url': 'sendpulse://a...d:user@example.com/',
|
||||
}),
|
||||
('sendpulse://user@example.com/client_id/client_secret/', {
|
||||
'instance': NotifySendPulse,
|
||||
# force a failure
|
||||
'response': False,
|
||||
'requests_response_code': requests.codes.internal_server_error,
|
||||
}),
|
||||
('sendpulse://user@example.com/client_id/client_secret/', {
|
||||
'instance': NotifySendPulse,
|
||||
# throw a bizzare code forcing us to fail to look it up
|
||||
'response': False,
|
||||
'requests_response_code': 999,
|
||||
}),
|
||||
('sendpulse://user@example.com/client_id/client_secret/', {
|
||||
'instance': NotifySendPulse,
|
||||
# Throws a series of connection and transfer exceptions when this flag
|
||||
# is set and tests that we gracfully handle them
|
||||
'test_requests_exceptions': True,
|
||||
}),
|
||||
)
|
||||
|
||||
|
||||
def test_plugin_sendpulse_urls():
|
||||
"""
|
||||
NotifySendPulse() Apprise URLs
|
||||
|
||||
"""
|
||||
|
||||
# Run our general tests
|
||||
AppriseURLTester(tests=apprise_url_tests).run_all()
|
||||
|
||||
|
||||
@mock.patch('requests.get')
|
||||
@mock.patch('requests.post')
|
||||
def test_plugin_sendpulse_edge_cases(mock_post, mock_get):
|
||||
"""
|
||||
NotifySendPulse() Edge Cases
|
||||
|
||||
"""
|
||||
|
||||
# no client_id
|
||||
with pytest.raises(TypeError):
|
||||
NotifySendPulse(
|
||||
client_id='abcd', client_secret=None,
|
||||
from_email='user@example.com')
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
NotifySendPulse(
|
||||
client_id=None, client_secret='abcd',
|
||||
from_email='user@example.com')
|
||||
|
||||
# invalid from email
|
||||
with pytest.raises(TypeError):
|
||||
NotifySendPulse(
|
||||
client_id='abcd', client_secret='abcd', from_email='!invalid')
|
||||
|
||||
# no email
|
||||
with pytest.raises(TypeError):
|
||||
NotifySendPulse(
|
||||
client_id='abcd', client_secret='abcd', from_email=None)
|
||||
|
||||
# Invalid To email address
|
||||
NotifySendPulse(
|
||||
client_id='abcd', client_secret='abcd',
|
||||
from_email='user@example.com', targets="!invalid")
|
||||
|
||||
# Test invalid bcc/cc entries mixed with good ones
|
||||
assert isinstance(NotifySendPulse(
|
||||
client_id='abcd', client_secret='abcd',
|
||||
from_email='l2g@example.com',
|
||||
bcc=('abc@def.com', '!invalid'),
|
||||
cc=('abc@test.org', '!invalid')), NotifySendPulse)
|
||||
|
||||
|
||||
@mock.patch('requests.get')
|
||||
@mock.patch('requests.post')
|
||||
def test_plugin_sendpulse_attachments(mock_post, mock_get):
|
||||
"""
|
||||
NotifySendPulse() Attachments
|
||||
|
||||
"""
|
||||
|
||||
request = mock.Mock()
|
||||
request.status_code = requests.codes.ok
|
||||
|
||||
# Prepare Mock
|
||||
mock_post.return_value = request
|
||||
mock_get.return_value = request
|
||||
|
||||
path = os.path.join(TEST_VAR_DIR, 'apprise-test.gif')
|
||||
attach = AppriseAttachment(path)
|
||||
obj = Apprise.instantiate('sendpulse://user@example.com/abcd/abcd')
|
||||
assert isinstance(obj, NotifySendPulse)
|
||||
assert obj.notify(
|
||||
body='body', title='title', notify_type=NotifyType.INFO,
|
||||
attach=attach) is True
|
||||
|
||||
mock_post.reset_mock()
|
||||
mock_get.reset_mock()
|
||||
|
||||
# Try again in a use case where we can't access the file
|
||||
with mock.patch("os.path.isfile", return_value=False):
|
||||
assert obj.notify(
|
||||
body='body', title='title', notify_type=NotifyType.INFO,
|
||||
attach=attach) is False
|
||||
|
||||
# Try again in a use case where we can't access the file
|
||||
with mock.patch("builtins.open", side_effect=OSError):
|
||||
assert obj.notify(
|
||||
body='body', title='title', notify_type=NotifyType.INFO,
|
||||
attach=attach) is False
|
Loading…
Reference in New Issue