basic tests passing

pull/1192/head
Chris Caron 2024-08-30 22:26:50 -04:00
parent 85af43960b
commit f2fe8e9971
3 changed files with 677 additions and 163 deletions

View File

@ -1,7 +1,7 @@
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2024, Chris Caron <lead2gold@gmail.com>
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
@ -30,15 +30,16 @@
import base64
from email.utils import formataddr
from json import dumps
from json import dumps, loads
import re
import requests
from .. import exception
from ..common import NotifyFormat, NotifyType
from ..common import NotifyFormat, NotifyType, PersistentStoreMode
from ..conversion import convert_between
from ..locale import gettext_lazy as _
from ..utils.parse import is_email, parse_emails, parse_list, validate_regex
from ..utils.parse import is_email, parse_emails, validate_regex
from .base import NotifyBase
@ -65,6 +66,9 @@ class NotifySendPulse(NotifyBase):
# The default Email API URL to use
notify_email_url = "https://api.sendpulse.com/smtp/emails"
# Our OAuth Query
notify_oauth_url = "https://api.sendpulse.com/oauth/access_token"
# Support attachments
attachment_support = True
@ -72,6 +76,18 @@ class NotifySendPulse(NotifyBase):
# 60/300 = 0.2
request_rate_per_sec = 0.2
# Our default is to no not use persistent storage beyond in-memory
# reference
storage_mode = PersistentStoreMode.AUTO
# Token expiry if not detected in seconds (below is 1 hr)
token_expiry = 3600
# The number of seconds to grace for early token renewal
# Below states that 10 seconds bfore our token expiry, we'll
# attempt to renew it
token_expiry_edge = 10
# Support attachments
attachment_support = True
@ -80,14 +96,18 @@ class NotifySendPulse(NotifyBase):
# Define object templates
templates = (
"{schema}://{from_email}/{client_id}/{client_secret}/",
"{schema}://{from_email}/{client_id}/{client_secret}/{targets}",
"{schema}://{user}@{host}/{client_secret}/",
"{schema}://{user}@{host}/{client_id}/{client_secret}/{targets}",
)
# Define our template arguments
template_tokens = dict(NotifyBase.template_tokens, **{
"from_email": {
"name": _("Source Email"),
"user": {
"name": _("User Name"),
"type": "string",
},
"host": {
"name": _("Domain"),
"type": "string",
"required": True,
},
@ -124,7 +144,7 @@ class NotifySendPulse(NotifyBase):
"from": {
"name": _("From Email"),
"type": "string",
"map_to": "from_email",
"map_to": "from_addr",
},
"cc": {
"name": _("Carbon Copy"),
@ -155,14 +175,60 @@ class NotifySendPulse(NotifyBase):
},
}
def __init__(self, from_email, client_id, client_secret, targets=None,
cc=None, bcc=None, template=None,
template_data=None, **kwargs):
def __init__(self, client_id, client_secret, from_addr=None, targets=None,
cc=None, bcc=None, template=None, template_data=None,
**kwargs):
"""
Initialize Notify SendPulse Object
"""
super().__init__(**kwargs)
# For tracking our email -> name lookups
self.names = {}
# Temporary from_addr to work with for parsing
_from_addr = [self.app_id, ""]
if self.user:
if self.host:
# Prepare the bases of our email
_from_addr = [_from_addr[0], "{}@{}".format(
re.split(r"[\s@]+", self.user)[0],
self.host,
)]
else:
result = is_email(self.user)
if result:
# Prepare the bases of our email and include domain
self.host = result["domain"]
_from_addr = [
result["name"] if result["name"]
else _from_addr[0], self.user]
if isinstance(from_addr, str):
result = is_email(from_addr)
if result:
_from_addr = (
result["name"] if result["name"] else _from_addr[0],
result["full_email"])
else:
# Only update the string but use the already detected info
_from_addr[0] = from_addr
result = is_email(_from_addr[1])
if not result:
# Parse Source domain based on from_addr
msg = "Invalid ~From~ email specified: {}".format(
"{} <{}>".format(_from_addr[0], _from_addr[1])
if _from_addr[0] else "{}".format(_from_addr[1]))
self.logger.warning(msg)
raise TypeError(msg)
# Store our lookup
self.from_addr = _from_addr[1]
self.names[_from_addr[1]] = _from_addr[0]
# Client ID
self.client_id = validate_regex(
client_id, *self.template_tokens["client_id"]["regex"])
@ -181,20 +247,6 @@ class NotifySendPulse(NotifyBase):
self.logger.warning(msg)
raise TypeError(msg)
result = is_email(from_email)
if not result:
msg = "Invalid ~From~ email specified: {}".format(from_email)
self.logger.warning(msg)
raise TypeError(msg)
# Tracks emails to name lookups (if they exist)
self.__email_map = {}
# Store from email address
self.from_email = result["full_email"]
self.__email_map[self.from_email] = result["name"] \
if result["name"] else self.app_id
# Acquire Targets (To Emails)
self.targets = []
@ -204,6 +256,8 @@ class NotifySendPulse(NotifyBase):
# Acquire Blind Carbon Copies
self.bcc = set()
# No template
self.template = None
if template:
try:
# Store our template
@ -225,9 +279,9 @@ class NotifySendPulse(NotifyBase):
for recipient in parse_emails(targets):
result = is_email(recipient)
if result:
self.targets.append(
(result["name"] if result["name"] else False,
result["full_email"]))
self.targets.append(result["full_email"])
if result["name"]:
self.names[result["full_email"]] = result["name"]
continue
self.logger.warning(
@ -237,16 +291,16 @@ class NotifySendPulse(NotifyBase):
else:
# If our target email list is empty we want to add ourselves to it
self.targets.append(self.from_email)
self.targets.append(self.from_addr)
# Validate recipients (cc:) and drop bad ones:
for recipient in parse_list(cc):
for recipient in parse_emails(cc):
result = is_email(recipient)
if result:
self.cc.add(result["full_email"])
if result["name"]:
self.__email_lookup[result["full_email"]] = result["name"]
self.names[result["full_email"]] = result["name"]
continue
self.logger.warning(
@ -255,13 +309,13 @@ class NotifySendPulse(NotifyBase):
)
# Validate recipients (bcc:) and drop bad ones:
for recipient in parse_list(bcc):
for recipient in parse_emails(bcc):
result = is_email(recipient)
if result:
self.bcc.add(result["full_email"])
if result["name"]:
self.__email_lookup[result["full_email"]] = result["name"]
self.names[result["full_email"]] = result["name"]
continue
self.logger.warning(
@ -271,7 +325,7 @@ class NotifySendPulse(NotifyBase):
if len(self.targets) == 0:
# Notify ourselves
self.targets.append(self.from_email)
self.targets.append(self.from_addr)
return
@ -296,7 +350,7 @@ class NotifySendPulse(NotifyBase):
# Handle our Carbon Copy Addresses
params["cc"] = ",".join([
formataddr(
(self.__email_lookup.get(e, False), e),
(self.names.get(e, False), e),
# Swap comma for it's escaped url code (if detected) since
# we're using that as a delimiter
charset="utf-8").replace(",", "%2C")
@ -306,7 +360,7 @@ class NotifySendPulse(NotifyBase):
# Handle our Blind Carbon Copy Addresses
params["bcc"] = ",".join([
formataddr(
(self.__email_lookup.get(e, False), e),
(self.names.get(e, False), e),
# Swap comma for it's escaped url code (if detected) since
# we're using that as a delimiter
charset="utf-8").replace(",", "%2C")
@ -316,6 +370,10 @@ class NotifySendPulse(NotifyBase):
# Handle our Template ID if if was specified
params["template"] = self.template
# handle from=
if self.names[self.from_addr] != self.app_id:
params["from"] = self.names[self.from_addr]
# Append our template_data into our parameter list
params.update(
{"+{}".format(k): v for k, v in self.template_data.items()})
@ -323,11 +381,11 @@ class NotifySendPulse(NotifyBase):
# a simple boolean check as to whether we display our target emails
# or not
has_targets = \
not (len(self.targets) == 1 and self.targets[0] == self.from_email)
not (len(self.targets) == 1 and self.targets[0] == self.from_addr)
return "{schema}://{source}/{cid}/{secret}/{targets}?{params}".format(
schema=self.secure_protocol,
source=self.from_email,
source=self.from_addr,
cid=self.pprint(self.client_id, privacy, safe=""),
secret=self.pprint(self.client_secret, privacy, safe=""),
targets="" if not has_targets else "/".join(
@ -341,17 +399,58 @@ class NotifySendPulse(NotifyBase):
"""
return len(self.targets)
def login(self):
"""
Authenticates with the server to get a access_token
"""
self.store.clear("access_token")
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
}
success, response = self._fetch(self.notify_oauth_url, payload)
if not success:
return False
access_token = response.get("access_token")
# If we get here, we're authenticated
try:
expires = \
int(response.get("expires_in")) - self.token_expiry_edge
if expires <= self.token_expiry_edge:
self.logger.error(
"SendPulse token expiry limit returned was invalid")
return False
elif expires > self.token_expiry:
self.logger.warning(
"SendPulse token expiry limit fixed to: {}s"
.format(self.token_expiry))
expires = self.token_expiry - self.token_expiry_edge
except (AttributeError, TypeError, ValueError):
# expires_in was not an integer
self.logger.warning(
"SendPulse token expiry limit presumed to be: {}s".format(
self.token_expiry))
expires = self.token_expiry - self.token_expiry_edge
self.store.set("access_token", access_token, expires=expires)
return access_token
def send(self, body, title="", notify_type=NotifyType.INFO, attach=None,
**kwargs):
"""
Perform SendPulse Notification
"""
headers = {
"User-Agent": self.app_id,
"Content-Type": "application/json",
"Authorization": "Bearer {}".format(self.apikey),
}
access_token = self.store.get("access_token") or self.login()
if not access_token:
return False
# error tracking (used for function return)
has_error = False
@ -360,8 +459,8 @@ class NotifySendPulse(NotifyBase):
_payload = {
"email": {
"from": {
"name": self.from_email[0],
"email": self.from_email[1],
"name": self.names[self.from_addr],
"email": self.from_addr,
},
# To is populated further on
"to": [],
@ -445,8 +544,8 @@ class NotifySendPulse(NotifyBase):
to = {
"email": target
}
if target in self.__email_lookup:
to["name"] = self.__email_lookup[target]
if target in self.names:
to["name"] = self.names[target]
# Set our target
payload["email"]["to"] = [to]
@ -457,8 +556,8 @@ class NotifySendPulse(NotifyBase):
item = {
"email": email,
}
if email in self.__email_lookup:
item["name"] = self.__email_lookup[email]
if email in self.names:
item["name"] = self.names[email]
payload["email"]["cc"].append(item)
@ -468,63 +567,119 @@ class NotifySendPulse(NotifyBase):
item = {
"email": email,
}
if email in self.__email_lookup:
item["name"] = self.__email_lookup[email]
if email in self.names:
item["name"] = self.names[email]
payload["email"]["bcc"].append(item)
self.logger.debug("SendPulse POST URL: %s (cert_verify=%r)",
self.notify_email_url, self.verify_certificate,
)
self.logger.debug("SendPulse Payload: %s", str(payload))
# Perform our post
success, response = self._fetch(
self.notify_email_url, payload, target, retry=1)
if not success:
has_error = True
continue
return not has_error
def _fetch(self, url, payload, target=None, retry=0):
"""
Wrapper to request.post() to manage it's response better and make
the send() function cleaner and easier to maintain.
This function returns True if the _post was successful and False
if it wasn't.
"""
headers = {
"User-Agent": self.app_id,
"Content-Type": "application/json",
}
access_token = self.store.get("access_token")
if access_token:
headers.update({"Authorization": f"Bearer {access_token}"})
self.logger.debug("SendPulse POST URL: {} (cert_verify={!r})".format(
url, self.verify_certificate,
))
self.logger.debug("SendPulse Payload: {}".format(str(payload)))
# Prepare our default response
response = {}
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_email_url,
url,
data=dumps(payload),
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code not in (
try:
response = loads(r.content)
except (AttributeError, TypeError, ValueError):
# This gets thrown if we can't parse our JSON Response
# - ValueError = r.content is Unparsable
# - TypeError = r.content is None
# - AttributeError = r is None
self.logger.warning("Invalid response from SendPulse server.")
self.logger.debug(
"Response Details:\r\n{}".format(r.content))
return (False, {})
# Reference status code
status_code = r.status_code
# Key likely expired, we'll reset it and try one more time
if status_code == requests.codes.unauthorized \
and retry and self.login():
return self._fetch(url, payload, target, retry=retry - 1)
if status_code not in (
requests.codes.ok, requests.codes.accepted):
# We had a problem
status_str = \
NotifySendPulse.http_response_code_lookup(
r.status_code)
status_code)
if target:
self.logger.warning(
"Failed to send SendPulse notification to {}: "
"{}{}error={}.".format(
target,
status_str,
", " if status_str else "",
r.status_code))
status_code))
else:
self.logger.warning(
"SendPulse Authentication Request failed: "
"{}{}error={}.".format(
status_str,
", " if status_str else "",
status_code))
self.logger.debug(
"Response Details:\r\n{}".format(r.content))
# Mark our failure
has_error = True
continue
else:
if target:
self.logger.info(
"Sent SendPulse notification to {}.".format(target))
else:
self.logger.debug("SendPulse authentication successful")
return (True, response)
except requests.RequestException as e:
self.logger.warning(
"A Connection error occurred sending SendPulse "
"notification to {}.".format(target))
self.logger.debug("Socket Exception: %s", str(e))
self.logger.debug("Socket Exception: {}".format(str(e)))
# Mark our failure
has_error = True
continue
return not has_error
return (False, response)
@staticmethod
def parse_url(url):
@ -534,13 +689,22 @@ class NotifySendPulse(NotifyBase):
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# Define our minimum requirements; defining them now saves us from
# having to if/else all kinds of branches below...
results["from_addr"] = None
results["client_id"] = None
results["client_secret"] = None
# Prepare our targets
results["targets"] = []
# Our URL looks like this:
# {schema}://{from_email}:{client_id}/{client_secret}/{targets}
# {schema}://{from_addr}:{client_id}/{client_secret}/{targets}
#
# which actually equates to:
# {schema}://{user}@{host}/{client_id}/{client_secret}
@ -548,10 +712,27 @@ class NotifySendPulse(NotifyBase):
# ^ ^
# | |
# -from addr-
if "from" in results["qsd"]:
results["from_addr"] = \
NotifySendPulse.unquote(results["qsd"]["from"].rstrip())
if is_email(results["from_addr"]):
# Our hostname is free'd up to be interpreted as part of the
# targets
results["targets"].append(
NotifySendPulse.unquote(results["host"]))
results["host"] = ""
if "user" in results["qsd"] and \
is_email(NotifySendPulse.unquote(results["user"])):
# Our hostname is free'd up to be interpreted as part of the
# targets
results["targets"].append(NotifySendPulse.unquote(results["host"]))
results["host"] = ""
# Get our potential email targets
# First 2 elements are the client_id and client_secret
results["targets"] = NotifySendPulse.split_path(results["fullpath"])
results["targets"] += NotifySendPulse.split_path(results["fullpath"])
# check for our client id
if "id" in results["qsd"] and len(results["qsd"]["id"]):
# Store our Client ID
@ -562,9 +743,6 @@ class NotifySendPulse(NotifyBase):
# Store our Client ID
results["client_id"] = results["targets"].pop(0)
else: # Not defined
results["client_id"] = None
if "secret" in results["qsd"] and len(results["qsd"]["secret"]):
# Store our Client Secret
results["client_secret"] = \
@ -574,47 +752,20 @@ class NotifySendPulse(NotifyBase):
# Store our Client Secret
results["client_secret"] = results["targets"].pop(0)
else: # Not defined
results["client_secret"] = None
if "from" in results["qsd"] and len(results["qsd"]["from"]):
results["from_email"] = \
NotifySendPulse.unquote(results["qsd"]["from_email"])
# This means any user@host is the To Address if defined
if results.get("user") and results.get("host"):
results["targets"] += "{}@{}".format(
NotifySendPulse.unquote(
results["password"]
if results["password"] else results["user"]),
NotifySendPulse.unquote(results["host"]),
)
elif results.get("user") and results.get("host"):
results["from_email"] = "{}@{}".format(
NotifySendPulse.unquote(
results["password"]
if results["password"] else results["user"]),
NotifySendPulse.unquote(results["host"]),
)
else: # Not defined
results["from_email"] = None
# The 'to' makes it easier to use yaml configuration
if "to" in results["qsd"] and len(results["qsd"]["to"]):
results["targets"] += \
NotifySendPulse.parse_list(results["qsd"]["to"])
results["targets"].append(
NotifySendPulse.unquote(results["qsd"]["to"]))
# Handle Carbon Copy Addresses
if "cc" in results["qsd"] and len(results["qsd"]["cc"]):
results["cc"] = \
NotifySendPulse.parse_list(results["qsd"]["cc"])
NotifySendPulse.unquote(results["qsd"]["cc"])
# Handle Blind Carbon Copy Addresses
if "bcc" in results["qsd"] and len(results["qsd"]["bcc"]):
results["bcc"] = \
NotifySendPulse.parse_list(results["qsd"]["bcc"])
NotifySendPulse.unquote(results["qsd"]["bcc"])
# Handle Blind Carbon Copy Addresses
if "template" in results["qsd"] and len(results["qsd"]["template"]):

View File

@ -2028,6 +2028,9 @@ def test_parse_emails():
assert len(results) == len(emails)
for email in emails:
assert email in results
is_email = utils.parse.is_email(email)
assert is_email
assert is_email.get('name')
# pass the entries in as a list
results = utils.parse.parse_emails(emails)
@ -2115,7 +2118,8 @@ def test_parse_urls():
# Commas and spaces found inside URLs are ignored
urls = [
(
"mailgun://noreply@sandbox.mailgun.org/apikey/?to=test@example.com,test2@example.com,,"
"mailgun://noreply@sandbox.mailgun.org/apikey/"
"?to=test@example.com,test2@example.com,,"
" abcd@example.com"
),
(
@ -3029,7 +3033,8 @@ def test_cwe312_url():
assert (
utils.cwe312.cwe312_url(
"slack://mybot@xoxb-43598234231-3248932482278-BZK5Wj15B9mPh1RkShJoCZ44"
"slack://mybot@xoxb-43598234231-3248932482278"
"-BZK5Wj15B9mPh1RkShJoCZ44"
"/lead2gold@gmail.com"
)
== "slack://mybot@x...4/l...m"

View File

@ -1,7 +1,7 @@
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2024, Chris Caron <lead2gold@gmail.com>
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
@ -25,6 +25,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from json import dumps, loads
# Disable logging for a cleaner testing output
import logging
import os
@ -39,73 +41,215 @@ from apprise.plugins.sendpulse import NotifySendPulse
logging.disable(logging.CRITICAL)
SENDPULSE_GOOD_RESPONSE = dumps({
"access_token": "abc123",
"expires_in": 3600,
})
SENDPULSE_BAD_RESPONSE = "{"
# Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), "var")
# Our Testing URLs
apprise_url_tests = (
("sendpulse://", {
"instance": None,
"instance": TypeError,
}),
("sendpulse://:@/", {
"instance": None,
"instance": TypeError,
}),
("sendpulse://abcd", {
# Just an broken email (no email, client_id or secret)
"instance": None,
# invalid from email
"instance": TypeError,
}),
("sendpulse://abcd@host", {
("sendpulse://abcd@host.com", {
# Just an Email specified, no client_id or client_secret
"instance": None,
"instance": TypeError,
}),
("sendpulse://user@example.com/client_id/client_secret/", {
("sendpulse://user@example.com/client_id/cs/?template=invalid", {
# Invalid template
"instance": TypeError,
}),
("sendpulse://user@example.com/client_id/cs1/?template=123", {
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/client_secret/"
("sendpulse://user@example.com/client_id/cs1/", {
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/cs1/?format=text", {
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/cs1/?format=html", {
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://chris@example.com/client_id/cs1/?from=Chris", {
# Set name only
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://?id=ci&secret=cs&user=chris@example.com", {
# Set login through user= only
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://?id=ci&secret=cs&user=Chris<chris@example.com>", {
# Set login through user= only
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://?id=ci&secret=cs&user=chris", {
# Set login through user= only - invaild email
"instance": TypeError,
}),
("sendpulse://example.com/client_id/cs1/?user=chris", {
# Set user as a name only
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://client_id/cs1/?user=chris@example.ca", {
# Set user as email
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://client_id/cs1/?from=Chris<chris@example.com>", {
# set full email with name
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://?from=Chris<chris@example.com>&id=ci&secret=cs", {
# leverage all get params from URL
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/cs1a/", {
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_BAD_RESPONSE,
# Notify will fail because auth failed
"response": False,
}),
("sendpulse://user@example.com/client_id/cs2/"
"?bcc=l2g@nuxref.com", {
# A good email with Blind Carbon Copy
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/client_secret/"
("sendpulse://user@example.com/client_id/cs2/"
"?bcc=invalid", {
# A good email with Blind Carbon Copy
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/cs3/"
"?cc=l2g@nuxref.com", {
# A good email with Carbon Copy
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/client_secret/"
"?to=l2g@nuxref.com", {
("sendpulse://user@example.com/client_id/cs4/"
"?cc=Chris<l2g@nuxref.com>", {
# A good email with Carbon Copy
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/client_secret/"
"?template=1234", {
# A good email with a template + no substitutions
("sendpulse://user@example.com/client_id/cs5/"
"?cc=invalid", {
# A good email with Carbon Copy
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/client_secret/"
("sendpulse://user@example.com/client_id/cs6/"
"?to=invalid", {
# an invalid to email
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/cs7/chris@example.com", {
# An email with a designated to email
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/cs8/"
"?to=Chris<chris@example.com>", {
# An email with a full name in in To field
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/cs9/"
"chris@example.com/chris2@example.com/Test<test@test.com>", {
# Several emails to notify
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/cs10/"
"?cc=Chris<chris@example.com>", {
# An email with a full name in cc
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/cs11/"
"?cc=chris@example.com", {
# An email with a full name in cc
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/cs12/"
"?bcc=Chris<chris@example.com>", {
# An email with a full name in bcc
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/cs13/"
"?bcc=chris@example.com", {
# An email with a full name in bcc
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/cs14/"
"?to=Chris<chris@example.com>", {
# An email with a full name in bcc
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/cs15/"
"?to=chris@example.com", {
# An email with a full name in bcc
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/cs16/"
"?template=1234&+sub=value&+sub2=value2", {
# A good email with a template + substitutions
"instance": NotifySendPulse,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
# Our expected url(privacy=True) startswith() response:
"privacy_url": "sendpulse://a...d:user@example.com/",
"privacy_url": "sendpulse://user@example.com/c...d/c...6/",
}),
("sendpulse://user@example.com/client_id/client_secret/", {
("sendpulse://user@example.com/client_id/cs17/", {
"instance": NotifySendPulse,
# force a failure
"response": False,
"requests_response_code": requests.codes.internal_server_error,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/client_secret/", {
("sendpulse://user@example.com/client_id/cs18/", {
"instance": NotifySendPulse,
# throw a bizzare code forcing us to fail to look it up
"response": False,
"requests_response_code": 999,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
("sendpulse://user@example.com/client_id/client_secret/", {
("sendpulse://user@example.com/client_id/cs19/", {
"instance": NotifySendPulse,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
"test_requests_exceptions": True,
"requests_response_text": SENDPULSE_GOOD_RESPONSE,
}),
)
@ -120,11 +264,227 @@ def test_plugin_sendpulse_urls():
AppriseURLTester(tests=apprise_url_tests).run_all()
@mock.patch("requests.get")
@mock.patch("requests.post")
def test_plugin_sendpulse_edge_cases(mock_post, mock_get):
def test_plugin_sendpulse_edge_cases(mock_post):
"""
NotifySendPulse() Edge Cases
"""
request = mock.Mock()
request.status_code = requests.codes.ok
request.content = SENDPULSE_GOOD_RESPONSE
# Prepare Mock
mock_post.return_value = request
obj = Apprise.instantiate(
"sendpulse://user@example.com/ci/cs/Test<test@example.com>")
assert obj.notify(
body="body", title="title", notify_type=NotifyType.INFO) is True
# Test our call count
assert mock_post.call_count == 2
# Authentication
assert mock_post.call_args_list[0][0][0] == \
"https://api.sendpulse.com/oauth/access_token"
payload = loads(mock_post.call_args_list[0][1]["data"])
assert payload == {
"grant_type": "client_credentials",
"client_id": "ci",
"client_secret": "cs",
}
assert mock_post.call_args_list[1][0][0] == \
"https://api.sendpulse.com/smtp/emails"
payload = loads(mock_post.call_args_list[1][1]["data"])
assert payload == {
"email": {
"from": {
"email": "user@example.com", "name": "Apprise"
},
"to": [{"email": "test@example.com", "name": "Test"}],
"subject": "title", "text": "body", "html": "Ym9keQ=="}}
mock_post.reset_mock()
obj = Apprise.instantiate("sendpulse://user@example.com/ci/cs/?from=John")
assert obj.notify(
body="body", title="title", notify_type=NotifyType.INFO) is True
# Test our call count
assert mock_post.call_count == 2
# Authentication
assert mock_post.call_args_list[0][0][0] == \
"https://api.sendpulse.com/oauth/access_token"
payload = loads(mock_post.call_args_list[0][1]["data"])
assert payload == {
"grant_type": "client_credentials",
"client_id": "ci",
"client_secret": "cs",
}
assert mock_post.call_args_list[1][0][0] == \
"https://api.sendpulse.com/smtp/emails"
payload = loads(mock_post.call_args_list[1][1]["data"])
assert payload == {
"email": {
"from": {
"email": "user@example.com", "name": "John"
},
"to": [{"email": "user@example.com", "name": "John"}],
"subject": "title", "text": "body", "html": "Ym9keQ=="}}
mock_post.reset_mock()
# Second call no longer needs to authenticate
assert obj.notify(
body="body", title="title", notify_type=NotifyType.INFO) is True
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
"https://api.sendpulse.com/smtp/emails"
# force an exception
mock_post.side_effect = requests.RequestException
assert obj.notify(
body="body", title="title", notify_type=NotifyType.INFO) is False
# Set an invalid return code
mock_post.side_effect = None
request.status_code = 403
assert obj.notify(
body="body", title="title", notify_type=NotifyType.INFO) is False
# Test re-authentication
mock_post.reset_mock()
request = mock.Mock()
obj = Apprise.instantiate("sendpulse://usr2@example.com/ci/cs/?from=Retry")
class sendpulse:
def __init__(self):
# 200 login okay
# 401 on retrival
# recursive re-attempt to login returns 200
# fetch after works
self._side_effect = iter([
requests.codes.ok, requests.codes.unauthorized,
requests.codes.ok, requests.codes.ok,
])
@property
def status_code(self):
return next(self._side_effect)
@property
def content(self):
return SENDPULSE_GOOD_RESPONSE
mock_post.return_value = sendpulse()
assert obj.notify(
body="body", title="title", notify_type=NotifyType.INFO) is True
assert mock_post.call_count == 4
# Authentication
assert mock_post.call_args_list[0][0][0] == \
"https://api.sendpulse.com/oauth/access_token"
# 401 received
assert mock_post.call_args_list[1][0][0] == \
"https://api.sendpulse.com/smtp/emails"
# Re-authenticate
assert mock_post.call_args_list[2][0][0] == \
"https://api.sendpulse.com/oauth/access_token"
# Try again
assert mock_post.call_args_list[3][0][0] == \
"https://api.sendpulse.com/smtp/emails"
# Test re-authentication (no recursive loops)
mock_post.reset_mock()
request = mock.Mock()
obj = Apprise.instantiate("sendpulse://usr2@example.com/ci/cs/?from=Retry")
class sendpulse:
def __init__(self):
# oauth always returns okay but notify returns 401
# recursive re-attempt only once
self._side_effect = iter([
requests.codes.ok, requests.codes.unauthorized,
requests.codes.ok, requests.codes.unauthorized,
requests.codes.ok, requests.codes.unauthorized,
requests.codes.ok, requests.codes.unauthorized,
requests.codes.ok, requests.codes.unauthorized,
requests.codes.ok, requests.codes.unauthorized,
requests.codes.ok, requests.codes.unauthorized,
requests.codes.ok, requests.codes.unauthorized,
])
@property
def status_code(self):
return next(self._side_effect)
@property
def content(self):
return SENDPULSE_GOOD_RESPONSE
mock_post.return_value = sendpulse()
assert obj.notify(
body="body", title="title", notify_type=NotifyType.INFO) is False
assert mock_post.call_count == 4
# Authentication
assert mock_post.call_args_list[0][0][0] == \
"https://api.sendpulse.com/oauth/access_token"
# 401 received
assert mock_post.call_args_list[1][0][0] == \
"https://api.sendpulse.com/smtp/emails"
# Re-authenticate
assert mock_post.call_args_list[2][0][0] == \
"https://api.sendpulse.com/oauth/access_token"
# Last failed attempt
assert mock_post.call_args_list[3][0][0] == \
"https://api.sendpulse.com/smtp/emails"
mock_post.side_effect = None
request = mock.Mock()
request.status_code = requests.codes.ok
request.content = SENDPULSE_GOOD_RESPONSE
mock_post.return_value = request
for expires_in in (None, -1, "garbage", 3600, 300000):
request.content = dumps({
"access_token": "abc123",
"expires_in": expires_in,
})
# Instantiate our object
obj = Apprise.instantiate("sendpulse://user@example.com/ci/cs/")
# Test variations of responses
obj.notify(
body="body", title="title", notify_type=NotifyType.INFO)
# expires_in is missing
request.content = dumps({
"access_token": "abc123",
})
# Instantiate our object
obj = Apprise.instantiate("sendpulse://user@example.com/ci/cs/")
assert obj.notify(
body="body", title="title", notify_type=NotifyType.INFO) is True
def test_plugin_sendpulse_fail_cases():
"""
NotifySendPulse() Fail Cases
"""
@ -132,39 +492,38 @@ def test_plugin_sendpulse_edge_cases(mock_post, mock_get):
with pytest.raises(TypeError):
NotifySendPulse(
client_id="abcd", client_secret=None,
from_email="user@example.com")
from_addr="user@example.com")
with pytest.raises(TypeError):
NotifySendPulse(
client_id=None, client_secret="abcd",
from_email="user@example.com")
client_id=None, client_secret="abcd123",
from_addr="user@example.com")
# invalid from email
with pytest.raises(TypeError):
NotifySendPulse(
client_id="abcd", client_secret="abcd", from_email="!invalid")
client_id="abcd", client_secret="abcd456", from_addr="!invalid")
# no email
with pytest.raises(TypeError):
NotifySendPulse(
client_id="abcd", client_secret="abcd", from_email=None)
client_id="abcd", client_secret="abcd789", from_addr=None)
# Invalid To email address
NotifySendPulse(
client_id="abcd", client_secret="abcd",
from_email="user@example.com", targets="!invalid")
client_id="abcd", client_secret="abcd321",
from_addr="user@example.com", targets="!invalid")
# Test invalid bcc/cc entries mixed with good ones
assert isinstance(NotifySendPulse(
client_id="abcd", client_secret="abcd",
from_email="l2g@example.com",
client_id="abcd", client_secret="abcd654",
from_addr="l2g@example.com",
bcc=("abc@def.com", "!invalid"),
cc=("abc@test.org", "!invalid")), NotifySendPulse)
@mock.patch("requests.get")
@mock.patch("requests.post")
def test_plugin_sendpulse_attachments(mock_post, mock_get):
def test_plugin_sendpulse_attachments(mock_post):
"""
NotifySendPulse() Attachments
@ -172,21 +531,20 @@ def test_plugin_sendpulse_attachments(mock_post, mock_get):
request = mock.Mock()
request.status_code = requests.codes.ok
request.content = SENDPULSE_GOOD_RESPONSE
# Prepare Mock
mock_post.return_value = request
mock_get.return_value = request
path = os.path.join(TEST_VAR_DIR, "apprise-test.gif")
attach = AppriseAttachment(path)
obj = Apprise.instantiate("sendpulse://user@example.com/abcd/abcd")
obj = Apprise.instantiate("sendpulse://user@example.com/aaaa/bbbb")
assert isinstance(obj, NotifySendPulse)
assert obj.notify(
body="body", title="title", notify_type=NotifyType.INFO,
attach=attach) is True
mock_post.reset_mock()
mock_get.reset_mock()
# Try again in a use case where we can't access the file
with mock.patch("os.path.isfile", return_value=False):