mirror of https://github.com/caronc/apprise
Added SendPulse Support (#1192)
parent
f4e3f6e676
commit
6a21495a09
|
@ -121,6 +121,7 @@ The table below identifies the services this tool supports and some example serv
|
|||
| [RSyslog](https://github.com/caronc/apprise/wiki/Notify_rsyslog) | rsyslog:// | (UDP) 514 | rsyslog://hostname<br />rsyslog://hostname/Facility
|
||||
| [Ryver](https://github.com/caronc/apprise/wiki/Notify_ryver) | ryver:// | (TCP) 443 | ryver://Organization/Token<br />ryver://botname@Organization/Token
|
||||
| [SendGrid](https://github.com/caronc/apprise/wiki/Notify_sendgrid) | sendgrid:// | (TCP) 443 | sendgrid://APIToken:FromEmail/<br />sendgrid://APIToken:FromEmail/ToEmail<br />sendgrid://APIToken:FromEmail/ToEmail1/ToEmail2/ToEmailN/
|
||||
| [SendPulse](https://github.com/caronc/apprise/wiki/Notify_sendpulse) | sendpulse:// | (TCP) 443 | sendpulse://user@host/ClientId/ClientSecret<br />sendpulse://user@host/ClientId/clientSecret/ToEmail<br />sendpulse://user@host/ClientId/ClientSecret/ToEmail1/ToEmail2/ToEmailN/
|
||||
| [ServerChan](https://github.com/caronc/apprise/wiki/Notify_serverchan) | schan:// | (TCP) 443 | schan://sendkey/
|
||||
| [Signal API](https://github.com/caronc/apprise/wiki/Notify_signal) | signal:// or signals:// | (TCP) 80 or 443 | signal://hostname:port/FromPhoneNo<br/>signal://hostname:port/FromPhoneNo/ToPhoneNo<br/>signal://hostname:port/FromPhoneNo/ToPhoneNo1/ToPhoneNo2/ToPhoneNoN/
|
||||
| [SIGNL4](https://github.com/caronc/apprise/wiki/Notify_signl4) | signl4:// | (TCP) 80 or 443 | signl4://hostname
|
||||
|
|
|
@ -0,0 +1,778 @@
|
|||
# BSD 2-Clause License
|
||||
#
|
||||
# Apprise - Push Notification Library.
|
||||
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions are met:
|
||||
#
|
||||
# 1. Redistributions of source code must retain the above copyright notice,
|
||||
# this list of conditions and the following disclaimer.
|
||||
#
|
||||
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
# this list of conditions and the following disclaimer in the documentation
|
||||
# and/or other materials provided with the distribution.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
||||
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
# POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
# Simple API Reference:
|
||||
# - https://sendpulse.com/integrations/api/smtp
|
||||
|
||||
import base64
|
||||
from email.utils import formataddr
|
||||
from json import dumps, loads
|
||||
import re
|
||||
|
||||
import requests
|
||||
|
||||
from .. import exception
|
||||
from ..common import NotifyFormat, NotifyType, PersistentStoreMode
|
||||
from ..conversion import convert_between
|
||||
from ..locale import gettext_lazy as _
|
||||
from ..utils.parse import is_email, parse_emails, validate_regex
|
||||
from .base import NotifyBase
|
||||
|
||||
|
||||
class NotifySendPulse(NotifyBase):
|
||||
"""
|
||||
A wrapper for Notify SendPulse Notifications
|
||||
"""
|
||||
|
||||
# The default descriptive name associated with the Notification
|
||||
service_name = "SendPulse"
|
||||
|
||||
# The services URL
|
||||
service_url = "https://sendpulse.com"
|
||||
|
||||
# The default secure protocol
|
||||
secure_protocol = "sendpulse"
|
||||
|
||||
# A URL that takes you to the setup/help of the specific protocol
|
||||
setup_url = "https://github.com/caronc/apprise/wiki/Notify_sendpulse"
|
||||
|
||||
# Default to markdown
|
||||
notify_format = NotifyFormat.HTML
|
||||
|
||||
# The default Email API URL to use
|
||||
notify_email_url = "https://api.sendpulse.com/smtp/emails"
|
||||
|
||||
# Our OAuth Query
|
||||
notify_oauth_url = "https://api.sendpulse.com/oauth/access_token"
|
||||
|
||||
# Support attachments
|
||||
attachment_support = True
|
||||
|
||||
# Allow 300 requests per minute.
|
||||
# 60/300 = 0.2
|
||||
request_rate_per_sec = 0.2
|
||||
|
||||
# Our default is to no not use persistent storage beyond in-memory
|
||||
# reference
|
||||
storage_mode = PersistentStoreMode.AUTO
|
||||
|
||||
# Token expiry if not detected in seconds (below is 1 hr)
|
||||
token_expiry = 3600
|
||||
|
||||
# The number of seconds to grace for early token renewal
|
||||
# Below states that 10 seconds bfore our token expiry, we'll
|
||||
# attempt to renew it
|
||||
token_expiry_edge = 10
|
||||
|
||||
# Support attachments
|
||||
attachment_support = True
|
||||
|
||||
# The default subject to use if one isn't specified.
|
||||
default_empty_subject = "<no subject>"
|
||||
|
||||
# Define object templates
|
||||
templates = (
|
||||
"{schema}://{user}@{host}/{client_secret}/",
|
||||
"{schema}://{user}@{host}/{client_id}/{client_secret}/{targets}",
|
||||
)
|
||||
|
||||
# Define our template arguments
|
||||
template_tokens = dict(NotifyBase.template_tokens, **{
|
||||
"user": {
|
||||
"name": _("User Name"),
|
||||
"type": "string",
|
||||
},
|
||||
"host": {
|
||||
"name": _("Domain"),
|
||||
"type": "string",
|
||||
"required": True,
|
||||
},
|
||||
"client_id": {
|
||||
"name": _("Client ID"),
|
||||
"type": "string",
|
||||
"required": True,
|
||||
"private": True,
|
||||
"regex": (r"^[A-Z0-9._-]+$", "i"),
|
||||
},
|
||||
"client_secret": {
|
||||
"name": _("Client Secret"),
|
||||
"type": "string",
|
||||
"required": True,
|
||||
"private": True,
|
||||
"regex": (r"^[A-Z0-9._-]+$", "i"),
|
||||
},
|
||||
"target_email": {
|
||||
"name": _("Target Email"),
|
||||
"type": "string",
|
||||
"map_to": "targets",
|
||||
},
|
||||
"targets": {
|
||||
"name": _("Targets"),
|
||||
"type": "list:string",
|
||||
},
|
||||
})
|
||||
|
||||
# Define our template arguments
|
||||
template_args = dict(NotifyBase.template_args, **{
|
||||
"to": {
|
||||
"alias_of": "targets",
|
||||
},
|
||||
"from": {
|
||||
"name": _("From Email"),
|
||||
"type": "string",
|
||||
"map_to": "from_addr",
|
||||
},
|
||||
"cc": {
|
||||
"name": _("Carbon Copy"),
|
||||
"type": "list:string",
|
||||
},
|
||||
"bcc": {
|
||||
"name": _("Blind Carbon Copy"),
|
||||
"type": "list:string",
|
||||
},
|
||||
"template": {
|
||||
# The template ID is an integer
|
||||
"name": _("Template ID"),
|
||||
"type": "int",
|
||||
},
|
||||
"id": {
|
||||
"alias_of": "client_id",
|
||||
},
|
||||
"secret": {
|
||||
"alias_of": "client_secret",
|
||||
}
|
||||
})
|
||||
|
||||
# Support Template Dynamic Variables (Substitutions)
|
||||
template_kwargs = {
|
||||
"template_data": {
|
||||
"name": _("Template Data"),
|
||||
"prefix": "+",
|
||||
},
|
||||
}
|
||||
|
||||
def __init__(self, client_id, client_secret, from_addr=None, targets=None,
|
||||
cc=None, bcc=None, template=None, template_data=None,
|
||||
**kwargs):
|
||||
"""
|
||||
Initialize Notify SendPulse Object
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
|
||||
# For tracking our email -> name lookups
|
||||
self.names = {}
|
||||
|
||||
# Temporary from_addr to work with for parsing
|
||||
_from_addr = [self.app_id, ""]
|
||||
|
||||
if self.user:
|
||||
if self.host:
|
||||
# Prepare the bases of our email
|
||||
_from_addr = [_from_addr[0], "{}@{}".format(
|
||||
re.split(r"[\s@]+", self.user)[0],
|
||||
self.host,
|
||||
)]
|
||||
|
||||
else:
|
||||
result = is_email(self.user)
|
||||
if result:
|
||||
# Prepare the bases of our email and include domain
|
||||
self.host = result["domain"]
|
||||
_from_addr = [
|
||||
result["name"] if result["name"]
|
||||
else _from_addr[0], self.user]
|
||||
|
||||
if isinstance(from_addr, str):
|
||||
result = is_email(from_addr)
|
||||
if result:
|
||||
_from_addr = (
|
||||
result["name"] if result["name"] else _from_addr[0],
|
||||
result["full_email"])
|
||||
else:
|
||||
# Only update the string but use the already detected info
|
||||
_from_addr[0] = from_addr
|
||||
|
||||
result = is_email(_from_addr[1])
|
||||
if not result:
|
||||
# Parse Source domain based on from_addr
|
||||
msg = "Invalid ~From~ email specified: {}".format(
|
||||
"{} <{}>".format(_from_addr[0], _from_addr[1])
|
||||
if _from_addr[0] else "{}".format(_from_addr[1]))
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
# Store our lookup
|
||||
self.from_addr = _from_addr[1]
|
||||
self.names[_from_addr[1]] = _from_addr[0]
|
||||
|
||||
# Client ID
|
||||
self.client_id = validate_regex(
|
||||
client_id, *self.template_tokens["client_id"]["regex"])
|
||||
if not self.client_id:
|
||||
msg = "An invalid SendPulse Client ID " \
|
||||
"({}) was specified.".format(client_id)
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
# Client Secret
|
||||
self.client_secret = validate_regex(
|
||||
client_secret, *self.template_tokens["client_secret"]["regex"])
|
||||
if not self.client_secret:
|
||||
msg = "An invalid SendPulse Client Secret " \
|
||||
"({}) was specified.".format(client_secret)
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
# Acquire Targets (To Emails)
|
||||
self.targets = []
|
||||
|
||||
# Acquire Carbon Copies
|
||||
self.cc = set()
|
||||
|
||||
# Acquire Blind Carbon Copies
|
||||
self.bcc = set()
|
||||
|
||||
# No template
|
||||
self.template = None
|
||||
if template:
|
||||
try:
|
||||
# Store our template
|
||||
self.template = int(template)
|
||||
|
||||
except (TypeError, ValueError):
|
||||
# Not a valid integer; ignore entry
|
||||
err = "The SendPulse Template ID specified ({}) is invalid."\
|
||||
.format(template)
|
||||
self.logger.warning(err)
|
||||
raise TypeError(err) from None
|
||||
|
||||
# Now our dynamic template data (if defined)
|
||||
self.template_data = template_data \
|
||||
if isinstance(template_data, dict) else {}
|
||||
|
||||
if targets:
|
||||
# Validate recipients (to:) and drop bad ones:
|
||||
for recipient in parse_emails(targets):
|
||||
result = is_email(recipient)
|
||||
if result:
|
||||
self.targets.append(result["full_email"])
|
||||
if result["name"]:
|
||||
self.names[result["full_email"]] = result["name"]
|
||||
continue
|
||||
|
||||
self.logger.warning(
|
||||
"Dropped invalid To email "
|
||||
"({}) specified.".format(recipient),
|
||||
)
|
||||
|
||||
else:
|
||||
# If our target email list is empty we want to add ourselves to it
|
||||
self.targets.append(self.from_addr)
|
||||
|
||||
# Validate recipients (cc:) and drop bad ones:
|
||||
for recipient in parse_emails(cc):
|
||||
|
||||
result = is_email(recipient)
|
||||
if result:
|
||||
self.cc.add(result["full_email"])
|
||||
if result["name"]:
|
||||
self.names[result["full_email"]] = result["name"]
|
||||
continue
|
||||
|
||||
self.logger.warning(
|
||||
"Dropped invalid Carbon Copy email "
|
||||
"({}) specified.".format(recipient),
|
||||
)
|
||||
|
||||
# Validate recipients (bcc:) and drop bad ones:
|
||||
for recipient in parse_emails(bcc):
|
||||
|
||||
result = is_email(recipient)
|
||||
if result:
|
||||
self.bcc.add(result["full_email"])
|
||||
if result["name"]:
|
||||
self.names[result["full_email"]] = result["name"]
|
||||
continue
|
||||
|
||||
self.logger.warning(
|
||||
"Dropped invalid Blind Carbon Copy email "
|
||||
"({}) specified.".format(recipient),
|
||||
)
|
||||
|
||||
if len(self.targets) == 0:
|
||||
# Notify ourselves
|
||||
self.targets.append(self.from_addr)
|
||||
|
||||
return
|
||||
|
||||
@property
|
||||
def url_identifier(self):
|
||||
"""
|
||||
Returns all of the identifiers that make this URL unique from
|
||||
another simliar one. Targets or end points should never be identified
|
||||
here.
|
||||
"""
|
||||
return (self.secure_protocol, self.client_id, self.client_secret)
|
||||
|
||||
def url(self, privacy=False, *args, **kwargs):
|
||||
"""
|
||||
Returns the URL built dynamically based on specified arguments.
|
||||
"""
|
||||
|
||||
# Our URL parameters
|
||||
params = self.url_parameters(privacy=privacy, *args, **kwargs)
|
||||
|
||||
if len(self.cc) > 0:
|
||||
# Handle our Carbon Copy Addresses
|
||||
params["cc"] = ",".join([
|
||||
formataddr(
|
||||
(self.names.get(e, False), e),
|
||||
# Swap comma for it's escaped url code (if detected) since
|
||||
# we're using that as a delimiter
|
||||
charset="utf-8").replace(",", "%2C")
|
||||
for e in self.cc])
|
||||
|
||||
if len(self.bcc) > 0:
|
||||
# Handle our Blind Carbon Copy Addresses
|
||||
params["bcc"] = ",".join([
|
||||
formataddr(
|
||||
(self.names.get(e, False), e),
|
||||
# Swap comma for it's escaped url code (if detected) since
|
||||
# we're using that as a delimiter
|
||||
charset="utf-8").replace(",", "%2C")
|
||||
for e in self.bcc])
|
||||
|
||||
if self.template:
|
||||
# Handle our Template ID if if was specified
|
||||
params["template"] = self.template
|
||||
|
||||
# handle from=
|
||||
if self.names[self.from_addr] != self.app_id:
|
||||
params["from"] = self.names[self.from_addr]
|
||||
|
||||
# Append our template_data into our parameter list
|
||||
params.update(
|
||||
{"+{}".format(k): v for k, v in self.template_data.items()})
|
||||
|
||||
# a simple boolean check as to whether we display our target emails
|
||||
# or not
|
||||
has_targets = \
|
||||
not (len(self.targets) == 1 and self.targets[0] == self.from_addr)
|
||||
|
||||
return "{schema}://{source}/{cid}/{secret}/{targets}?{params}".format(
|
||||
schema=self.secure_protocol,
|
||||
source=self.from_addr,
|
||||
cid=self.pprint(self.client_id, privacy, safe=""),
|
||||
secret=self.pprint(self.client_secret, privacy, safe=""),
|
||||
targets="" if not has_targets else "/".join(
|
||||
[NotifySendPulse.quote(x, safe="") for x in self.targets]),
|
||||
params=NotifySendPulse.urlencode(params),
|
||||
)
|
||||
|
||||
def __len__(self):
|
||||
"""
|
||||
Returns the number of targets associated with this notification
|
||||
"""
|
||||
return len(self.targets)
|
||||
|
||||
def login(self):
|
||||
"""
|
||||
Authenticates with the server to get a access_token
|
||||
"""
|
||||
self.store.clear("access_token")
|
||||
payload = {
|
||||
"grant_type": "client_credentials",
|
||||
"client_id": self.client_id,
|
||||
"client_secret": self.client_secret,
|
||||
}
|
||||
|
||||
success, response = self._fetch(self.notify_oauth_url, payload)
|
||||
if not success:
|
||||
return False
|
||||
|
||||
access_token = response.get("access_token")
|
||||
|
||||
# If we get here, we're authenticated
|
||||
try:
|
||||
expires = \
|
||||
int(response.get("expires_in")) - self.token_expiry_edge
|
||||
if expires <= self.token_expiry_edge:
|
||||
self.logger.error(
|
||||
"SendPulse token expiry limit returned was invalid")
|
||||
return False
|
||||
|
||||
elif expires > self.token_expiry:
|
||||
self.logger.warning(
|
||||
"SendPulse token expiry limit fixed to: {}s"
|
||||
.format(self.token_expiry))
|
||||
expires = self.token_expiry - self.token_expiry_edge
|
||||
|
||||
except (AttributeError, TypeError, ValueError):
|
||||
# expires_in was not an integer
|
||||
self.logger.warning(
|
||||
"SendPulse token expiry limit presumed to be: {}s".format(
|
||||
self.token_expiry))
|
||||
expires = self.token_expiry - self.token_expiry_edge
|
||||
|
||||
self.store.set("access_token", access_token, expires=expires)
|
||||
|
||||
return access_token
|
||||
|
||||
def send(self, body, title="", notify_type=NotifyType.INFO, attach=None,
|
||||
**kwargs):
|
||||
"""
|
||||
Perform SendPulse Notification
|
||||
"""
|
||||
|
||||
access_token = self.store.get("access_token") or self.login()
|
||||
if not access_token:
|
||||
return False
|
||||
|
||||
# error tracking (used for function return)
|
||||
has_error = False
|
||||
|
||||
# A Simple Email Payload Template
|
||||
_payload = {
|
||||
"email": {
|
||||
"from": {
|
||||
"name": self.names[self.from_addr],
|
||||
"email": self.from_addr,
|
||||
},
|
||||
# To is populated further on
|
||||
"to": [],
|
||||
# A subject is a requirement, so if none is specified we must
|
||||
# set a default with at least 1 character or SendPulse will
|
||||
# deny our request
|
||||
"subject": title if title else self.default_empty_subject,
|
||||
}
|
||||
}
|
||||
|
||||
# Prepare Email Message
|
||||
if self.notify_format == NotifyFormat.HTML:
|
||||
# HTML
|
||||
_payload["email"].update({
|
||||
"text": convert_between(
|
||||
NotifyFormat.HTML, NotifyFormat.TEXT, body),
|
||||
"html": base64.b64encode(body.encode("utf-8")).decode("ascii"),
|
||||
})
|
||||
|
||||
else: # Text
|
||||
_payload["email"]["text"] = body
|
||||
|
||||
if attach and self.attachment_support:
|
||||
attachments = {}
|
||||
|
||||
# Send our attachments
|
||||
for no, attachment in enumerate(attach, start=1):
|
||||
# Perform some simple error checking
|
||||
if not attachment:
|
||||
# We could not access the attachment
|
||||
self.logger.error(
|
||||
"Could not access SendPulse attachment {}.".format(
|
||||
attachment.url(privacy=True)))
|
||||
return False
|
||||
|
||||
try:
|
||||
attachments[
|
||||
attachment.name if attachment.name
|
||||
else f"file{no:03}.dat"] = attachment.base64()
|
||||
|
||||
except exception.AppriseException:
|
||||
# We could not access the attachment
|
||||
self.logger.error(
|
||||
"Could not access SendPulse attachment {}.".format(
|
||||
attachment.url(privacy=True)))
|
||||
return False
|
||||
|
||||
self.logger.debug(
|
||||
"Appending SendPulse attachment {}".format(
|
||||
attachment.url(privacy=True)))
|
||||
|
||||
# Append our attachments to the payload
|
||||
_payload["email"].update({
|
||||
"attachments_binary": attachments,
|
||||
})
|
||||
|
||||
if self.template:
|
||||
_payload["email"].update({
|
||||
"template": {
|
||||
"id": self.template,
|
||||
"variables": self.template_data,
|
||||
}})
|
||||
|
||||
targets = list(self.targets)
|
||||
while len(targets) > 0:
|
||||
target = targets.pop(0)
|
||||
|
||||
# Create a copy of our template
|
||||
payload = _payload.copy()
|
||||
|
||||
# the cc, bcc, to field must be unique or SendMail will fail, the
|
||||
# below code prepares this by ensuring the target isn't in the cc
|
||||
# list or bcc list. It also makes sure the cc list does not contain
|
||||
# any of the bcc entries
|
||||
cc = (self.cc - self.bcc - {target})
|
||||
bcc = (self.bcc - {target})
|
||||
|
||||
#
|
||||
# prepare our 'to'
|
||||
#
|
||||
to = {
|
||||
"email": target
|
||||
}
|
||||
if target in self.names:
|
||||
to["name"] = self.names[target]
|
||||
|
||||
# Set our target
|
||||
payload["email"]["to"] = [to]
|
||||
|
||||
if len(cc):
|
||||
payload["email"]["cc"] = []
|
||||
for email in cc:
|
||||
item = {
|
||||
"email": email,
|
||||
}
|
||||
if email in self.names:
|
||||
item["name"] = self.names[email]
|
||||
|
||||
payload["email"]["cc"].append(item)
|
||||
|
||||
if len(bcc):
|
||||
payload["email"]["bcc"] = []
|
||||
for email in bcc:
|
||||
item = {
|
||||
"email": email,
|
||||
}
|
||||
if email in self.names:
|
||||
item["name"] = self.names[email]
|
||||
|
||||
payload["email"]["bcc"].append(item)
|
||||
|
||||
# Perform our post
|
||||
success, response = self._fetch(
|
||||
self.notify_email_url, payload, target, retry=1)
|
||||
if not success:
|
||||
has_error = True
|
||||
continue
|
||||
|
||||
return not has_error
|
||||
|
||||
def _fetch(self, url, payload, target=None, retry=0):
|
||||
"""
|
||||
Wrapper to request.post() to manage it's response better and make
|
||||
the send() function cleaner and easier to maintain.
|
||||
|
||||
This function returns True if the _post was successful and False
|
||||
if it wasn't.
|
||||
"""
|
||||
headers = {
|
||||
"User-Agent": self.app_id,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
access_token = self.store.get("access_token")
|
||||
if access_token:
|
||||
headers.update({"Authorization": f"Bearer {access_token}"})
|
||||
|
||||
self.logger.debug("SendPulse POST URL: {} (cert_verify={!r})".format(
|
||||
url, self.verify_certificate,
|
||||
))
|
||||
self.logger.debug("SendPulse Payload: {}".format(str(payload)))
|
||||
|
||||
# Prepare our default response
|
||||
response = {}
|
||||
|
||||
# Always call throttle before any remote server i/o is made
|
||||
self.throttle()
|
||||
try:
|
||||
r = requests.post(
|
||||
url,
|
||||
data=dumps(payload),
|
||||
headers=headers,
|
||||
verify=self.verify_certificate,
|
||||
timeout=self.request_timeout,
|
||||
)
|
||||
|
||||
try:
|
||||
response = loads(r.content)
|
||||
|
||||
except (AttributeError, TypeError, ValueError):
|
||||
# This gets thrown if we can't parse our JSON Response
|
||||
# - ValueError = r.content is Unparsable
|
||||
# - TypeError = r.content is None
|
||||
# - AttributeError = r is None
|
||||
self.logger.warning("Invalid response from SendPulse server.")
|
||||
self.logger.debug(
|
||||
"Response Details:\r\n{}".format(r.content))
|
||||
return (False, {})
|
||||
|
||||
# Reference status code
|
||||
status_code = r.status_code
|
||||
|
||||
# Key likely expired, we'll reset it and try one more time
|
||||
if status_code == requests.codes.unauthorized \
|
||||
and retry and self.login():
|
||||
return self._fetch(url, payload, target, retry=retry - 1)
|
||||
|
||||
if status_code not in (
|
||||
requests.codes.ok, requests.codes.accepted):
|
||||
# We had a problem
|
||||
status_str = \
|
||||
NotifySendPulse.http_response_code_lookup(
|
||||
status_code)
|
||||
|
||||
if target:
|
||||
self.logger.warning(
|
||||
"Failed to send SendPulse notification to {}: "
|
||||
"{}{}error={}.".format(
|
||||
target,
|
||||
status_str,
|
||||
", " if status_str else "",
|
||||
status_code))
|
||||
else:
|
||||
self.logger.warning(
|
||||
"SendPulse Authentication Request failed: "
|
||||
"{}{}error={}.".format(
|
||||
status_str,
|
||||
", " if status_str else "",
|
||||
status_code))
|
||||
|
||||
self.logger.debug(
|
||||
"Response Details:\r\n{}".format(r.content))
|
||||
|
||||
else:
|
||||
if target:
|
||||
self.logger.info(
|
||||
"Sent SendPulse notification to {}.".format(target))
|
||||
else:
|
||||
self.logger.debug("SendPulse authentication successful")
|
||||
|
||||
return (True, response)
|
||||
|
||||
except requests.RequestException as e:
|
||||
self.logger.warning(
|
||||
"A Connection error occurred sending SendPulse "
|
||||
"notification to {}.".format(target))
|
||||
self.logger.debug("Socket Exception: {}".format(str(e)))
|
||||
|
||||
return (False, response)
|
||||
|
||||
@staticmethod
|
||||
def parse_url(url):
|
||||
"""
|
||||
Parses the URL and returns enough arguments that can allow
|
||||
us to re-instantiate this object.
|
||||
|
||||
"""
|
||||
|
||||
results = NotifyBase.parse_url(url, verify_host=False)
|
||||
if not results:
|
||||
# We're done early as we couldn't load the results
|
||||
return results
|
||||
|
||||
# Define our minimum requirements; defining them now saves us from
|
||||
# having to if/else all kinds of branches below...
|
||||
results["from_addr"] = None
|
||||
results["client_id"] = None
|
||||
results["client_secret"] = None
|
||||
|
||||
# Prepare our targets
|
||||
results["targets"] = []
|
||||
|
||||
# Our URL looks like this:
|
||||
# {schema}://{from_addr}:{client_id}/{client_secret}/{targets}
|
||||
#
|
||||
# which actually equates to:
|
||||
# {schema}://{user}@{host}/{client_id}/{client_secret}
|
||||
# /{email1}/{email2}/etc..
|
||||
# ^ ^
|
||||
# | |
|
||||
# -from addr-
|
||||
if "from" in results["qsd"]:
|
||||
results["from_addr"] = \
|
||||
NotifySendPulse.unquote(results["qsd"]["from"].rstrip())
|
||||
|
||||
if is_email(results["from_addr"]):
|
||||
# Our hostname is free'd up to be interpreted as part of the
|
||||
# targets
|
||||
results["targets"].append(
|
||||
NotifySendPulse.unquote(results["host"]))
|
||||
results["host"] = ""
|
||||
|
||||
if "user" in results["qsd"] and \
|
||||
is_email(NotifySendPulse.unquote(results["user"])):
|
||||
# Our hostname is free'd up to be interpreted as part of the
|
||||
# targets
|
||||
results["targets"].append(NotifySendPulse.unquote(results["host"]))
|
||||
results["host"] = ""
|
||||
|
||||
# Get our potential email targets
|
||||
# First 2 elements are the client_id and client_secret
|
||||
results["targets"] += NotifySendPulse.split_path(results["fullpath"])
|
||||
# check for our client id
|
||||
if "id" in results["qsd"] and len(results["qsd"]["id"]):
|
||||
# Store our Client ID
|
||||
results["client_id"] = \
|
||||
NotifySendPulse.unquote(results["qsd"]["id"])
|
||||
|
||||
elif results["targets"]:
|
||||
# Store our Client ID
|
||||
results["client_id"] = results["targets"].pop(0)
|
||||
|
||||
if "secret" in results["qsd"] and len(results["qsd"]["secret"]):
|
||||
# Store our Client Secret
|
||||
results["client_secret"] = \
|
||||
NotifySendPulse.unquote(results["qsd"]["secret"])
|
||||
|
||||
elif results["targets"]:
|
||||
# Store our Client Secret
|
||||
results["client_secret"] = results["targets"].pop(0)
|
||||
|
||||
# The 'to' makes it easier to use yaml configuration
|
||||
if "to" in results["qsd"] and len(results["qsd"]["to"]):
|
||||
results["targets"].append(
|
||||
NotifySendPulse.unquote(results["qsd"]["to"]))
|
||||
|
||||
# Handle Carbon Copy Addresses
|
||||
if "cc" in results["qsd"] and len(results["qsd"]["cc"]):
|
||||
results["cc"] = \
|
||||
NotifySendPulse.unquote(results["qsd"]["cc"])
|
||||
|
||||
# Handle Blind Carbon Copy Addresses
|
||||
if "bcc" in results["qsd"] and len(results["qsd"]["bcc"]):
|
||||
results["bcc"] = \
|
||||
NotifySendPulse.unquote(results["qsd"]["bcc"])
|
||||
|
||||
# Handle Blind Carbon Copy Addresses
|
||||
if "template" in results["qsd"] and len(results["qsd"]["template"]):
|
||||
results["template"] = \
|
||||
NotifySendPulse.unquote(results["qsd"]["template"])
|
||||
|
||||
# Add any template substitutions
|
||||
results["template_data"] = results["qsd+"]
|
||||
|
||||
return results
|
|
@ -69,8 +69,8 @@ notification services. It supports sending alerts to platforms such as: \
|
|||
`ParsePlatform`, `Plivo`, `PopcornNotify`, `Prowl`, `Pushalot`, \
|
||||
`PushBullet`, `Pushjet`, `PushMe`, `Pushover`, `Pushplus`, `PushSafer`, \
|
||||
`Pushy`, `PushDeer`, `QQ Push`, `Revolt`, `Reddit`, `Resend`, `Rocket.Chat`, \
|
||||
`RSyslog`, `SendGrid`, `ServerChan`, `Seven`, `SFR`, `Signal`, `SIGNL4`, \
|
||||
`SimplePush`, `Sinch`, `Slack`, `SMPP`, `SMSEagle`, `SMS Manager`, \
|
||||
`RSyslog`, `SendGrid`, `SendPulse`, `ServerChan`, `Seven`, `SFR`, `Signal`, \
|
||||
`SIGNL4`, `SimplePush`, `Sinch`, `Slack`, `SMPP`, `SMSEagle`, `SMS Manager`, \
|
||||
`SMTP2Go`, `SparkPost`, `Splunk`, `Spike`, `Spug Push`, `Super Toasty`, \
|
||||
`Streamlabs`, `Stride`, `Synology Chat`, `Syslog`, `Techulus Push`, \
|
||||
`Telegram`, `Threema Gateway`, `Twilio`, `Twitter`, `Twist`, `Vapid`, \
|
||||
|
|
|
@ -138,6 +138,7 @@ keywords = [
|
|||
"RSyslog",
|
||||
"Ryver",
|
||||
"SendGrid",
|
||||
"SendPulse",
|
||||
"ServerChan",
|
||||
"SES",
|
||||
"Seven",
|
||||
|
|
|
@ -2028,6 +2028,9 @@ def test_parse_emails():
|
|||
assert len(results) == len(emails)
|
||||
for email in emails:
|
||||
assert email in results
|
||||
is_email = utils.parse.is_email(email)
|
||||
assert is_email
|
||||
assert is_email.get("name")
|
||||
|
||||
# pass the entries in as a list
|
||||
results = utils.parse.parse_emails(emails)
|
||||
|
@ -2115,7 +2118,8 @@ def test_parse_urls():
|
|||
# Commas and spaces found inside URLs are ignored
|
||||
urls = [
|
||||
(
|
||||
"mailgun://noreply@sandbox.mailgun.org/apikey/?to=test@example.com,test2@example.com,,"
|
||||
"mailgun://noreply@sandbox.mailgun.org/apikey/"
|
||||
"?to=test@example.com,test2@example.com,,"
|
||||
" abcd@example.com"
|
||||
),
|
||||
(
|
||||
|
@ -3029,7 +3033,8 @@ def test_cwe312_url():
|
|||
|
||||
assert (
|
||||
utils.cwe312.cwe312_url(
|
||||
"slack://mybot@xoxb-43598234231-3248932482278-BZK5Wj15B9mPh1RkShJoCZ44"
|
||||
"slack://mybot@xoxb-43598234231-3248932482278"
|
||||
"-BZK5Wj15B9mPh1RkShJoCZ44"
|
||||
"/lead2gold@gmail.com"
|
||||
)
|
||||
== "slack://mybot@x...4/l...m"
|
||||
|
|
|
@ -0,0 +1,559 @@
|
|||
# BSD 2-Clause License
|
||||
#
|
||||
# Apprise - Push Notification Library.
|
||||
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions are met:
|
||||
#
|
||||
# 1. Redistributions of source code must retain the above copyright notice,
|
||||
# this list of conditions and the following disclaimer.
|
||||
#
|
||||
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
# this list of conditions and the following disclaimer in the documentation
|
||||
# and/or other materials provided with the distribution.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
||||
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
# POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
from json import dumps, loads
|
||||
|
||||
# Disable logging for a cleaner testing output
|
||||
import logging
|
||||
import os
|
||||
from unittest import mock
|
||||
|
||||
from helpers import AppriseURLTester
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from apprise import Apprise, AppriseAttachment, NotifyType
|
||||
from apprise.plugins.sendpulse import NotifySendPulse
|
||||
|
||||
logging.disable(logging.CRITICAL)
|
||||
|
||||
SENDPULSE_GOOD_RESPONSE = dumps({
|
||||
"access_token": "abc123",
|
||||
"expires_in": 3600,
|
||||
})
|
||||
|
||||
SENDPULSE_BAD_RESPONSE = "{"
|
||||
|
||||
# Attachment Directory
|
||||
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), "var")
|
||||
|
||||
# Our Testing URLs
|
||||
apprise_url_tests = (
|
||||
("sendpulse://", {
|
||||
"instance": TypeError,
|
||||
}),
|
||||
("sendpulse://:@/", {
|
||||
"instance": TypeError,
|
||||
}),
|
||||
("sendpulse://abcd", {
|
||||
# invalid from email
|
||||
"instance": TypeError,
|
||||
}),
|
||||
("sendpulse://abcd@host.com", {
|
||||
# Just an Email specified, no client_id or client_secret
|
||||
"instance": TypeError,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs/?template=invalid", {
|
||||
# Invalid template
|
||||
"instance": TypeError,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs1/?template=123", {
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs1/", {
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs1/?format=text", {
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs1/?format=html", {
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://chris@example.com/client_id/cs1/?from=Chris", {
|
||||
# Set name only
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://?id=ci&secret=cs&user=chris@example.com", {
|
||||
# Set login through user= only
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://?id=ci&secret=cs&user=Chris<chris@example.com>", {
|
||||
# Set login through user= only
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://?id=ci&secret=cs&user=chris", {
|
||||
# Set login through user= only - invaild email
|
||||
"instance": TypeError,
|
||||
}),
|
||||
("sendpulse://example.com/client_id/cs1/?user=chris", {
|
||||
# Set user as a name only
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://client_id/cs1/?user=chris@example.ca", {
|
||||
# Set user as email
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://client_id/cs1/?from=Chris<chris@example.com>", {
|
||||
# set full email with name
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://?from=Chris<chris@example.com>&id=ci&secret=cs", {
|
||||
# leverage all get params from URL
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs1a/", {
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_BAD_RESPONSE,
|
||||
# Notify will fail because auth failed
|
||||
"response": False,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs2/"
|
||||
"?bcc=l2g@nuxref.com", {
|
||||
# A good email with Blind Carbon Copy
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs2/"
|
||||
"?bcc=invalid", {
|
||||
# A good email with Blind Carbon Copy
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs3/"
|
||||
"?cc=l2g@nuxref.com", {
|
||||
# A good email with Carbon Copy
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs4/"
|
||||
"?cc=Chris<l2g@nuxref.com>", {
|
||||
# A good email with Carbon Copy
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs5/"
|
||||
"?cc=invalid", {
|
||||
# A good email with Carbon Copy
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs6/"
|
||||
"?to=invalid", {
|
||||
# an invalid to email
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs7/chris@example.com", {
|
||||
# An email with a designated to email
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs8/"
|
||||
"?to=Chris<chris@example.com>", {
|
||||
# An email with a full name in in To field
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs9/"
|
||||
"chris@example.com/chris2@example.com/Test<test@test.com>", {
|
||||
# Several emails to notify
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs10/"
|
||||
"?cc=Chris<chris@example.com>", {
|
||||
# An email with a full name in cc
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs11/"
|
||||
"?cc=chris@example.com", {
|
||||
# An email with a full name in cc
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs12/"
|
||||
"?bcc=Chris<chris@example.com>", {
|
||||
# An email with a full name in bcc
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs13/"
|
||||
"?bcc=chris@example.com", {
|
||||
# An email with a full name in bcc
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs14/"
|
||||
"?to=Chris<chris@example.com>", {
|
||||
# An email with a full name in bcc
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs15/"
|
||||
"?to=chris@example.com", {
|
||||
# An email with a full name in bcc
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs16/"
|
||||
"?template=1234&+sub=value&+sub2=value2", {
|
||||
# A good email with a template + substitutions
|
||||
"instance": NotifySendPulse,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
"privacy_url": "sendpulse://user@example.com/c...d/c...6/",
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs17/", {
|
||||
"instance": NotifySendPulse,
|
||||
# force a failure
|
||||
"response": False,
|
||||
"requests_response_code": requests.codes.internal_server_error,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs18/", {
|
||||
"instance": NotifySendPulse,
|
||||
# throw a bizzare code forcing us to fail to look it up
|
||||
"response": False,
|
||||
"requests_response_code": 999,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
("sendpulse://user@example.com/client_id/cs19/", {
|
||||
"instance": NotifySendPulse,
|
||||
# Throws a series of connection and transfer exceptions when this flag
|
||||
# is set and tests that we gracfully handle them
|
||||
"test_requests_exceptions": True,
|
||||
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
|
||||
}),
|
||||
)
|
||||
|
||||
|
||||
def test_plugin_sendpulse_urls():
|
||||
"""
|
||||
NotifySendPulse() Apprise URLs
|
||||
|
||||
"""
|
||||
|
||||
# Run our general tests
|
||||
AppriseURLTester(tests=apprise_url_tests).run_all()
|
||||
|
||||
|
||||
@mock.patch("requests.post")
|
||||
def test_plugin_sendpulse_edge_cases(mock_post):
|
||||
"""
|
||||
NotifySendPulse() Edge Cases
|
||||
"""
|
||||
request = mock.Mock()
|
||||
request.status_code = requests.codes.ok
|
||||
request.content = SENDPULSE_GOOD_RESPONSE
|
||||
|
||||
# Prepare Mock
|
||||
mock_post.return_value = request
|
||||
|
||||
obj = Apprise.instantiate(
|
||||
"sendpulse://user@example.com/ci/cs/Test<test@example.com>")
|
||||
|
||||
assert obj.notify(
|
||||
body="body", title="title", notify_type=NotifyType.INFO) is True
|
||||
|
||||
# Test our call count
|
||||
assert mock_post.call_count == 2
|
||||
|
||||
# Authentication
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
"https://api.sendpulse.com/oauth/access_token"
|
||||
|
||||
payload = loads(mock_post.call_args_list[0][1]["data"])
|
||||
assert payload == {
|
||||
"grant_type": "client_credentials",
|
||||
"client_id": "ci",
|
||||
"client_secret": "cs",
|
||||
}
|
||||
|
||||
assert mock_post.call_args_list[1][0][0] == \
|
||||
"https://api.sendpulse.com/smtp/emails"
|
||||
payload = loads(mock_post.call_args_list[1][1]["data"])
|
||||
|
||||
assert payload == {
|
||||
"email": {
|
||||
"from": {
|
||||
"email": "user@example.com", "name": "Apprise"
|
||||
},
|
||||
"to": [{"email": "test@example.com", "name": "Test"}],
|
||||
"subject": "title", "text": "body", "html": "Ym9keQ=="}}
|
||||
|
||||
mock_post.reset_mock()
|
||||
|
||||
obj = Apprise.instantiate("sendpulse://user@example.com/ci/cs/?from=John")
|
||||
|
||||
assert obj.notify(
|
||||
body="body", title="title", notify_type=NotifyType.INFO) is True
|
||||
|
||||
# Test our call count
|
||||
assert mock_post.call_count == 2
|
||||
|
||||
# Authentication
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
"https://api.sendpulse.com/oauth/access_token"
|
||||
|
||||
payload = loads(mock_post.call_args_list[0][1]["data"])
|
||||
assert payload == {
|
||||
"grant_type": "client_credentials",
|
||||
"client_id": "ci",
|
||||
"client_secret": "cs",
|
||||
}
|
||||
|
||||
assert mock_post.call_args_list[1][0][0] == \
|
||||
"https://api.sendpulse.com/smtp/emails"
|
||||
payload = loads(mock_post.call_args_list[1][1]["data"])
|
||||
|
||||
assert payload == {
|
||||
"email": {
|
||||
"from": {
|
||||
"email": "user@example.com", "name": "John"
|
||||
},
|
||||
"to": [{"email": "user@example.com", "name": "John"}],
|
||||
"subject": "title", "text": "body", "html": "Ym9keQ=="}}
|
||||
|
||||
mock_post.reset_mock()
|
||||
|
||||
# Second call no longer needs to authenticate
|
||||
assert obj.notify(
|
||||
body="body", title="title", notify_type=NotifyType.INFO) is True
|
||||
|
||||
assert mock_post.call_count == 1
|
||||
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
"https://api.sendpulse.com/smtp/emails"
|
||||
|
||||
# force an exception
|
||||
mock_post.side_effect = requests.RequestException
|
||||
assert obj.notify(
|
||||
body="body", title="title", notify_type=NotifyType.INFO) is False
|
||||
|
||||
# Set an invalid return code
|
||||
mock_post.side_effect = None
|
||||
request.status_code = 403
|
||||
assert obj.notify(
|
||||
body="body", title="title", notify_type=NotifyType.INFO) is False
|
||||
|
||||
# Test re-authentication
|
||||
mock_post.reset_mock()
|
||||
request = mock.Mock()
|
||||
obj = Apprise.instantiate("sendpulse://usr2@example.com/ci/cs/?from=Retry")
|
||||
|
||||
class sendpulse:
|
||||
def __init__(self):
|
||||
# 200 login okay
|
||||
# 401 on retrival
|
||||
# recursive re-attempt to login returns 200
|
||||
# fetch after works
|
||||
self._side_effect = iter([
|
||||
requests.codes.ok, requests.codes.unauthorized,
|
||||
requests.codes.ok, requests.codes.ok,
|
||||
])
|
||||
|
||||
@property
|
||||
def status_code(self):
|
||||
return next(self._side_effect)
|
||||
|
||||
@property
|
||||
def content(self):
|
||||
return SENDPULSE_GOOD_RESPONSE
|
||||
|
||||
mock_post.return_value = sendpulse()
|
||||
|
||||
assert obj.notify(
|
||||
body="body", title="title", notify_type=NotifyType.INFO) is True
|
||||
|
||||
assert mock_post.call_count == 4
|
||||
# Authentication
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
"https://api.sendpulse.com/oauth/access_token"
|
||||
# 401 received
|
||||
assert mock_post.call_args_list[1][0][0] == \
|
||||
"https://api.sendpulse.com/smtp/emails"
|
||||
# Re-authenticate
|
||||
assert mock_post.call_args_list[2][0][0] == \
|
||||
"https://api.sendpulse.com/oauth/access_token"
|
||||
# Try again
|
||||
assert mock_post.call_args_list[3][0][0] == \
|
||||
"https://api.sendpulse.com/smtp/emails"
|
||||
|
||||
# Test re-authentication (no recursive loops)
|
||||
mock_post.reset_mock()
|
||||
request = mock.Mock()
|
||||
obj = Apprise.instantiate("sendpulse://usr2@example.com/ci/cs/?from=Retry")
|
||||
|
||||
class sendpulse:
|
||||
def __init__(self):
|
||||
# oauth always returns okay but notify returns 401
|
||||
# recursive re-attempt only once
|
||||
self._side_effect = iter([
|
||||
requests.codes.ok, requests.codes.unauthorized,
|
||||
requests.codes.ok, requests.codes.unauthorized,
|
||||
requests.codes.ok, requests.codes.unauthorized,
|
||||
requests.codes.ok, requests.codes.unauthorized,
|
||||
requests.codes.ok, requests.codes.unauthorized,
|
||||
requests.codes.ok, requests.codes.unauthorized,
|
||||
requests.codes.ok, requests.codes.unauthorized,
|
||||
requests.codes.ok, requests.codes.unauthorized,
|
||||
])
|
||||
|
||||
@property
|
||||
def status_code(self):
|
||||
return next(self._side_effect)
|
||||
|
||||
@property
|
||||
def content(self):
|
||||
return SENDPULSE_GOOD_RESPONSE
|
||||
|
||||
mock_post.return_value = sendpulse()
|
||||
|
||||
assert obj.notify(
|
||||
body="body", title="title", notify_type=NotifyType.INFO) is False
|
||||
|
||||
assert mock_post.call_count == 4
|
||||
# Authentication
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
"https://api.sendpulse.com/oauth/access_token"
|
||||
# 401 received
|
||||
assert mock_post.call_args_list[1][0][0] == \
|
||||
"https://api.sendpulse.com/smtp/emails"
|
||||
# Re-authenticate
|
||||
assert mock_post.call_args_list[2][0][0] == \
|
||||
"https://api.sendpulse.com/oauth/access_token"
|
||||
# Last failed attempt
|
||||
assert mock_post.call_args_list[3][0][0] == \
|
||||
"https://api.sendpulse.com/smtp/emails"
|
||||
|
||||
mock_post.side_effect = None
|
||||
request = mock.Mock()
|
||||
request.status_code = requests.codes.ok
|
||||
request.content = SENDPULSE_GOOD_RESPONSE
|
||||
mock_post.return_value = request
|
||||
for expires_in in (None, -1, "garbage", 3600, 300000):
|
||||
request.content = dumps({
|
||||
"access_token": "abc123",
|
||||
"expires_in": expires_in,
|
||||
})
|
||||
|
||||
# Instantiate our object
|
||||
obj = Apprise.instantiate("sendpulse://user@example.com/ci/cs/")
|
||||
|
||||
# Test variations of responses
|
||||
obj.notify(
|
||||
body="body", title="title", notify_type=NotifyType.INFO)
|
||||
|
||||
# expires_in is missing
|
||||
request.content = dumps({
|
||||
"access_token": "abc123",
|
||||
})
|
||||
|
||||
# Instantiate our object
|
||||
obj = Apprise.instantiate("sendpulse://user@example.com/ci/cs/")
|
||||
assert obj.notify(
|
||||
body="body", title="title", notify_type=NotifyType.INFO) is True
|
||||
|
||||
|
||||
def test_plugin_sendpulse_fail_cases():
|
||||
"""
|
||||
NotifySendPulse() Fail Cases
|
||||
|
||||
"""
|
||||
|
||||
# no client_id
|
||||
with pytest.raises(TypeError):
|
||||
NotifySendPulse(
|
||||
client_id="abcd", client_secret=None,
|
||||
from_addr="user@example.com")
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
NotifySendPulse(
|
||||
client_id=None, client_secret="abcd123",
|
||||
from_addr="user@example.com")
|
||||
|
||||
# invalid from email
|
||||
with pytest.raises(TypeError):
|
||||
NotifySendPulse(
|
||||
client_id="abcd", client_secret="abcd456", from_addr="!invalid")
|
||||
|
||||
# no email
|
||||
with pytest.raises(TypeError):
|
||||
NotifySendPulse(
|
||||
client_id="abcd", client_secret="abcd789", from_addr=None)
|
||||
|
||||
# Invalid To email address
|
||||
NotifySendPulse(
|
||||
client_id="abcd", client_secret="abcd321",
|
||||
from_addr="user@example.com", targets="!invalid")
|
||||
|
||||
# Test invalid bcc/cc entries mixed with good ones
|
||||
assert isinstance(NotifySendPulse(
|
||||
client_id="abcd", client_secret="abcd654",
|
||||
from_addr="l2g@example.com",
|
||||
bcc=("abc@def.com", "!invalid"),
|
||||
cc=("abc@test.org", "!invalid")), NotifySendPulse)
|
||||
|
||||
|
||||
@mock.patch("requests.post")
|
||||
def test_plugin_sendpulse_attachments(mock_post):
|
||||
"""
|
||||
NotifySendPulse() Attachments
|
||||
|
||||
"""
|
||||
|
||||
request = mock.Mock()
|
||||
request.status_code = requests.codes.ok
|
||||
request.content = SENDPULSE_GOOD_RESPONSE
|
||||
|
||||
# Prepare Mock
|
||||
mock_post.return_value = request
|
||||
|
||||
path = os.path.join(TEST_VAR_DIR, "apprise-test.gif")
|
||||
attach = AppriseAttachment(path)
|
||||
obj = Apprise.instantiate("sendpulse://user@example.com/aaaa/bbbb")
|
||||
assert isinstance(obj, NotifySendPulse)
|
||||
assert obj.notify(
|
||||
body="body", title="title", notify_type=NotifyType.INFO,
|
||||
attach=attach) is True
|
||||
|
||||
mock_post.reset_mock()
|
||||
|
||||
# Try again in a use case where we can't access the file
|
||||
with mock.patch("os.path.isfile", return_value=False):
|
||||
assert obj.notify(
|
||||
body="body", title="title", notify_type=NotifyType.INFO,
|
||||
attach=attach) is False
|
||||
|
||||
# Try again in a use case where we can't access the file
|
||||
with mock.patch("builtins.open", side_effect=OSError):
|
||||
assert obj.notify(
|
||||
body="body", title="title", notify_type=NotifyType.INFO,
|
||||
attach=attach) is False
|
Loading…
Reference in New Issue