diff --git a/README.md b/README.md
index e7532ad4..4967ba49 100644
--- a/README.md
+++ b/README.md
@@ -121,6 +121,7 @@ The table below identifies the services this tool supports and some example serv
| [RSyslog](https://github.com/caronc/apprise/wiki/Notify_rsyslog) | rsyslog:// | (UDP) 514 | rsyslog://hostname
rsyslog://hostname/Facility
| [Ryver](https://github.com/caronc/apprise/wiki/Notify_ryver) | ryver:// | (TCP) 443 | ryver://Organization/Token
ryver://botname@Organization/Token
| [SendGrid](https://github.com/caronc/apprise/wiki/Notify_sendgrid) | sendgrid:// | (TCP) 443 | sendgrid://APIToken:FromEmail/
sendgrid://APIToken:FromEmail/ToEmail
sendgrid://APIToken:FromEmail/ToEmail1/ToEmail2/ToEmailN/
+| [SendPulse](https://github.com/caronc/apprise/wiki/Notify_sendpulse) | sendpulse:// | (TCP) 443 | sendpulse://user@host/ClientId/ClientSecret
sendpulse://user@host/ClientId/clientSecret/ToEmail
sendpulse://user@host/ClientId/ClientSecret/ToEmail1/ToEmail2/ToEmailN/
| [ServerChan](https://github.com/caronc/apprise/wiki/Notify_serverchan) | schan:// | (TCP) 443 | schan://sendkey/
| [Signal API](https://github.com/caronc/apprise/wiki/Notify_signal) | signal:// or signals:// | (TCP) 80 or 443 | signal://hostname:port/FromPhoneNo
signal://hostname:port/FromPhoneNo/ToPhoneNo
signal://hostname:port/FromPhoneNo/ToPhoneNo1/ToPhoneNo2/ToPhoneNoN/
| [SIGNL4](https://github.com/caronc/apprise/wiki/Notify_signl4) | signl4:// | (TCP) 80 or 443 | signl4://hostname
diff --git a/apprise/plugins/sendpulse.py b/apprise/plugins/sendpulse.py
new file mode 100644
index 00000000..821f53dd
--- /dev/null
+++ b/apprise/plugins/sendpulse.py
@@ -0,0 +1,778 @@
+# BSD 2-Clause License
+#
+# Apprise - Push Notification Library.
+# Copyright (c) 2025, Chris Caron
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+# Simple API Reference:
+# - https://sendpulse.com/integrations/api/smtp
+
+import base64
+from email.utils import formataddr
+from json import dumps, loads
+import re
+
+import requests
+
+from .. import exception
+from ..common import NotifyFormat, NotifyType, PersistentStoreMode
+from ..conversion import convert_between
+from ..locale import gettext_lazy as _
+from ..utils.parse import is_email, parse_emails, validate_regex
+from .base import NotifyBase
+
+
+class NotifySendPulse(NotifyBase):
+ """
+ A wrapper for Notify SendPulse Notifications
+ """
+
+ # The default descriptive name associated with the Notification
+ service_name = "SendPulse"
+
+ # The services URL
+ service_url = "https://sendpulse.com"
+
+ # The default secure protocol
+ secure_protocol = "sendpulse"
+
+ # A URL that takes you to the setup/help of the specific protocol
+ setup_url = "https://github.com/caronc/apprise/wiki/Notify_sendpulse"
+
+ # Default to markdown
+ notify_format = NotifyFormat.HTML
+
+ # The default Email API URL to use
+ notify_email_url = "https://api.sendpulse.com/smtp/emails"
+
+ # Our OAuth Query
+ notify_oauth_url = "https://api.sendpulse.com/oauth/access_token"
+
+ # Support attachments
+ attachment_support = True
+
+ # Allow 300 requests per minute.
+ # 60/300 = 0.2
+ request_rate_per_sec = 0.2
+
+ # Our default is to no not use persistent storage beyond in-memory
+ # reference
+ storage_mode = PersistentStoreMode.AUTO
+
+ # Token expiry if not detected in seconds (below is 1 hr)
+ token_expiry = 3600
+
+ # The number of seconds to grace for early token renewal
+ # Below states that 10 seconds bfore our token expiry, we'll
+ # attempt to renew it
+ token_expiry_edge = 10
+
+ # Support attachments
+ attachment_support = True
+
+ # The default subject to use if one isn't specified.
+ default_empty_subject = ""
+
+ # Define object templates
+ templates = (
+ "{schema}://{user}@{host}/{client_secret}/",
+ "{schema}://{user}@{host}/{client_id}/{client_secret}/{targets}",
+ )
+
+ # Define our template arguments
+ template_tokens = dict(NotifyBase.template_tokens, **{
+ "user": {
+ "name": _("User Name"),
+ "type": "string",
+ },
+ "host": {
+ "name": _("Domain"),
+ "type": "string",
+ "required": True,
+ },
+ "client_id": {
+ "name": _("Client ID"),
+ "type": "string",
+ "required": True,
+ "private": True,
+ "regex": (r"^[A-Z0-9._-]+$", "i"),
+ },
+ "client_secret": {
+ "name": _("Client Secret"),
+ "type": "string",
+ "required": True,
+ "private": True,
+ "regex": (r"^[A-Z0-9._-]+$", "i"),
+ },
+ "target_email": {
+ "name": _("Target Email"),
+ "type": "string",
+ "map_to": "targets",
+ },
+ "targets": {
+ "name": _("Targets"),
+ "type": "list:string",
+ },
+ })
+
+ # Define our template arguments
+ template_args = dict(NotifyBase.template_args, **{
+ "to": {
+ "alias_of": "targets",
+ },
+ "from": {
+ "name": _("From Email"),
+ "type": "string",
+ "map_to": "from_addr",
+ },
+ "cc": {
+ "name": _("Carbon Copy"),
+ "type": "list:string",
+ },
+ "bcc": {
+ "name": _("Blind Carbon Copy"),
+ "type": "list:string",
+ },
+ "template": {
+ # The template ID is an integer
+ "name": _("Template ID"),
+ "type": "int",
+ },
+ "id": {
+ "alias_of": "client_id",
+ },
+ "secret": {
+ "alias_of": "client_secret",
+ }
+ })
+
+ # Support Template Dynamic Variables (Substitutions)
+ template_kwargs = {
+ "template_data": {
+ "name": _("Template Data"),
+ "prefix": "+",
+ },
+ }
+
+ def __init__(self, client_id, client_secret, from_addr=None, targets=None,
+ cc=None, bcc=None, template=None, template_data=None,
+ **kwargs):
+ """
+ Initialize Notify SendPulse Object
+ """
+ super().__init__(**kwargs)
+
+ # For tracking our email -> name lookups
+ self.names = {}
+
+ # Temporary from_addr to work with for parsing
+ _from_addr = [self.app_id, ""]
+
+ if self.user:
+ if self.host:
+ # Prepare the bases of our email
+ _from_addr = [_from_addr[0], "{}@{}".format(
+ re.split(r"[\s@]+", self.user)[0],
+ self.host,
+ )]
+
+ else:
+ result = is_email(self.user)
+ if result:
+ # Prepare the bases of our email and include domain
+ self.host = result["domain"]
+ _from_addr = [
+ result["name"] if result["name"]
+ else _from_addr[0], self.user]
+
+ if isinstance(from_addr, str):
+ result = is_email(from_addr)
+ if result:
+ _from_addr = (
+ result["name"] if result["name"] else _from_addr[0],
+ result["full_email"])
+ else:
+ # Only update the string but use the already detected info
+ _from_addr[0] = from_addr
+
+ result = is_email(_from_addr[1])
+ if not result:
+ # Parse Source domain based on from_addr
+ msg = "Invalid ~From~ email specified: {}".format(
+ "{} <{}>".format(_from_addr[0], _from_addr[1])
+ if _from_addr[0] else "{}".format(_from_addr[1]))
+ self.logger.warning(msg)
+ raise TypeError(msg)
+
+ # Store our lookup
+ self.from_addr = _from_addr[1]
+ self.names[_from_addr[1]] = _from_addr[0]
+
+ # Client ID
+ self.client_id = validate_regex(
+ client_id, *self.template_tokens["client_id"]["regex"])
+ if not self.client_id:
+ msg = "An invalid SendPulse Client ID " \
+ "({}) was specified.".format(client_id)
+ self.logger.warning(msg)
+ raise TypeError(msg)
+
+ # Client Secret
+ self.client_secret = validate_regex(
+ client_secret, *self.template_tokens["client_secret"]["regex"])
+ if not self.client_secret:
+ msg = "An invalid SendPulse Client Secret " \
+ "({}) was specified.".format(client_secret)
+ self.logger.warning(msg)
+ raise TypeError(msg)
+
+ # Acquire Targets (To Emails)
+ self.targets = []
+
+ # Acquire Carbon Copies
+ self.cc = set()
+
+ # Acquire Blind Carbon Copies
+ self.bcc = set()
+
+ # No template
+ self.template = None
+ if template:
+ try:
+ # Store our template
+ self.template = int(template)
+
+ except (TypeError, ValueError):
+ # Not a valid integer; ignore entry
+ err = "The SendPulse Template ID specified ({}) is invalid."\
+ .format(template)
+ self.logger.warning(err)
+ raise TypeError(err) from None
+
+ # Now our dynamic template data (if defined)
+ self.template_data = template_data \
+ if isinstance(template_data, dict) else {}
+
+ if targets:
+ # Validate recipients (to:) and drop bad ones:
+ for recipient in parse_emails(targets):
+ result = is_email(recipient)
+ if result:
+ self.targets.append(result["full_email"])
+ if result["name"]:
+ self.names[result["full_email"]] = result["name"]
+ continue
+
+ self.logger.warning(
+ "Dropped invalid To email "
+ "({}) specified.".format(recipient),
+ )
+
+ else:
+ # If our target email list is empty we want to add ourselves to it
+ self.targets.append(self.from_addr)
+
+ # Validate recipients (cc:) and drop bad ones:
+ for recipient in parse_emails(cc):
+
+ result = is_email(recipient)
+ if result:
+ self.cc.add(result["full_email"])
+ if result["name"]:
+ self.names[result["full_email"]] = result["name"]
+ continue
+
+ self.logger.warning(
+ "Dropped invalid Carbon Copy email "
+ "({}) specified.".format(recipient),
+ )
+
+ # Validate recipients (bcc:) and drop bad ones:
+ for recipient in parse_emails(bcc):
+
+ result = is_email(recipient)
+ if result:
+ self.bcc.add(result["full_email"])
+ if result["name"]:
+ self.names[result["full_email"]] = result["name"]
+ continue
+
+ self.logger.warning(
+ "Dropped invalid Blind Carbon Copy email "
+ "({}) specified.".format(recipient),
+ )
+
+ if len(self.targets) == 0:
+ # Notify ourselves
+ self.targets.append(self.from_addr)
+
+ return
+
+ @property
+ def url_identifier(self):
+ """
+ Returns all of the identifiers that make this URL unique from
+ another simliar one. Targets or end points should never be identified
+ here.
+ """
+ return (self.secure_protocol, self.client_id, self.client_secret)
+
+ def url(self, privacy=False, *args, **kwargs):
+ """
+ Returns the URL built dynamically based on specified arguments.
+ """
+
+ # Our URL parameters
+ params = self.url_parameters(privacy=privacy, *args, **kwargs)
+
+ if len(self.cc) > 0:
+ # Handle our Carbon Copy Addresses
+ params["cc"] = ",".join([
+ formataddr(
+ (self.names.get(e, False), e),
+ # Swap comma for it's escaped url code (if detected) since
+ # we're using that as a delimiter
+ charset="utf-8").replace(",", "%2C")
+ for e in self.cc])
+
+ if len(self.bcc) > 0:
+ # Handle our Blind Carbon Copy Addresses
+ params["bcc"] = ",".join([
+ formataddr(
+ (self.names.get(e, False), e),
+ # Swap comma for it's escaped url code (if detected) since
+ # we're using that as a delimiter
+ charset="utf-8").replace(",", "%2C")
+ for e in self.bcc])
+
+ if self.template:
+ # Handle our Template ID if if was specified
+ params["template"] = self.template
+
+ # handle from=
+ if self.names[self.from_addr] != self.app_id:
+ params["from"] = self.names[self.from_addr]
+
+ # Append our template_data into our parameter list
+ params.update(
+ {"+{}".format(k): v for k, v in self.template_data.items()})
+
+ # a simple boolean check as to whether we display our target emails
+ # or not
+ has_targets = \
+ not (len(self.targets) == 1 and self.targets[0] == self.from_addr)
+
+ return "{schema}://{source}/{cid}/{secret}/{targets}?{params}".format(
+ schema=self.secure_protocol,
+ source=self.from_addr,
+ cid=self.pprint(self.client_id, privacy, safe=""),
+ secret=self.pprint(self.client_secret, privacy, safe=""),
+ targets="" if not has_targets else "/".join(
+ [NotifySendPulse.quote(x, safe="") for x in self.targets]),
+ params=NotifySendPulse.urlencode(params),
+ )
+
+ def __len__(self):
+ """
+ Returns the number of targets associated with this notification
+ """
+ return len(self.targets)
+
+ def login(self):
+ """
+ Authenticates with the server to get a access_token
+ """
+ self.store.clear("access_token")
+ payload = {
+ "grant_type": "client_credentials",
+ "client_id": self.client_id,
+ "client_secret": self.client_secret,
+ }
+
+ success, response = self._fetch(self.notify_oauth_url, payload)
+ if not success:
+ return False
+
+ access_token = response.get("access_token")
+
+ # If we get here, we're authenticated
+ try:
+ expires = \
+ int(response.get("expires_in")) - self.token_expiry_edge
+ if expires <= self.token_expiry_edge:
+ self.logger.error(
+ "SendPulse token expiry limit returned was invalid")
+ return False
+
+ elif expires > self.token_expiry:
+ self.logger.warning(
+ "SendPulse token expiry limit fixed to: {}s"
+ .format(self.token_expiry))
+ expires = self.token_expiry - self.token_expiry_edge
+
+ except (AttributeError, TypeError, ValueError):
+ # expires_in was not an integer
+ self.logger.warning(
+ "SendPulse token expiry limit presumed to be: {}s".format(
+ self.token_expiry))
+ expires = self.token_expiry - self.token_expiry_edge
+
+ self.store.set("access_token", access_token, expires=expires)
+
+ return access_token
+
+ def send(self, body, title="", notify_type=NotifyType.INFO, attach=None,
+ **kwargs):
+ """
+ Perform SendPulse Notification
+ """
+
+ access_token = self.store.get("access_token") or self.login()
+ if not access_token:
+ return False
+
+ # error tracking (used for function return)
+ has_error = False
+
+ # A Simple Email Payload Template
+ _payload = {
+ "email": {
+ "from": {
+ "name": self.names[self.from_addr],
+ "email": self.from_addr,
+ },
+ # To is populated further on
+ "to": [],
+ # A subject is a requirement, so if none is specified we must
+ # set a default with at least 1 character or SendPulse will
+ # deny our request
+ "subject": title if title else self.default_empty_subject,
+ }
+ }
+
+ # Prepare Email Message
+ if self.notify_format == NotifyFormat.HTML:
+ # HTML
+ _payload["email"].update({
+ "text": convert_between(
+ NotifyFormat.HTML, NotifyFormat.TEXT, body),
+ "html": base64.b64encode(body.encode("utf-8")).decode("ascii"),
+ })
+
+ else: # Text
+ _payload["email"]["text"] = body
+
+ if attach and self.attachment_support:
+ attachments = {}
+
+ # Send our attachments
+ for no, attachment in enumerate(attach, start=1):
+ # Perform some simple error checking
+ if not attachment:
+ # We could not access the attachment
+ self.logger.error(
+ "Could not access SendPulse attachment {}.".format(
+ attachment.url(privacy=True)))
+ return False
+
+ try:
+ attachments[
+ attachment.name if attachment.name
+ else f"file{no:03}.dat"] = attachment.base64()
+
+ except exception.AppriseException:
+ # We could not access the attachment
+ self.logger.error(
+ "Could not access SendPulse attachment {}.".format(
+ attachment.url(privacy=True)))
+ return False
+
+ self.logger.debug(
+ "Appending SendPulse attachment {}".format(
+ attachment.url(privacy=True)))
+
+ # Append our attachments to the payload
+ _payload["email"].update({
+ "attachments_binary": attachments,
+ })
+
+ if self.template:
+ _payload["email"].update({
+ "template": {
+ "id": self.template,
+ "variables": self.template_data,
+ }})
+
+ targets = list(self.targets)
+ while len(targets) > 0:
+ target = targets.pop(0)
+
+ # Create a copy of our template
+ payload = _payload.copy()
+
+ # the cc, bcc, to field must be unique or SendMail will fail, the
+ # below code prepares this by ensuring the target isn't in the cc
+ # list or bcc list. It also makes sure the cc list does not contain
+ # any of the bcc entries
+ cc = (self.cc - self.bcc - {target})
+ bcc = (self.bcc - {target})
+
+ #
+ # prepare our 'to'
+ #
+ to = {
+ "email": target
+ }
+ if target in self.names:
+ to["name"] = self.names[target]
+
+ # Set our target
+ payload["email"]["to"] = [to]
+
+ if len(cc):
+ payload["email"]["cc"] = []
+ for email in cc:
+ item = {
+ "email": email,
+ }
+ if email in self.names:
+ item["name"] = self.names[email]
+
+ payload["email"]["cc"].append(item)
+
+ if len(bcc):
+ payload["email"]["bcc"] = []
+ for email in bcc:
+ item = {
+ "email": email,
+ }
+ if email in self.names:
+ item["name"] = self.names[email]
+
+ payload["email"]["bcc"].append(item)
+
+ # Perform our post
+ success, response = self._fetch(
+ self.notify_email_url, payload, target, retry=1)
+ if not success:
+ has_error = True
+ continue
+
+ return not has_error
+
+ def _fetch(self, url, payload, target=None, retry=0):
+ """
+ Wrapper to request.post() to manage it's response better and make
+ the send() function cleaner and easier to maintain.
+
+ This function returns True if the _post was successful and False
+ if it wasn't.
+ """
+ headers = {
+ "User-Agent": self.app_id,
+ "Content-Type": "application/json",
+ }
+
+ access_token = self.store.get("access_token")
+ if access_token:
+ headers.update({"Authorization": f"Bearer {access_token}"})
+
+ self.logger.debug("SendPulse POST URL: {} (cert_verify={!r})".format(
+ url, self.verify_certificate,
+ ))
+ self.logger.debug("SendPulse Payload: {}".format(str(payload)))
+
+ # Prepare our default response
+ response = {}
+
+ # Always call throttle before any remote server i/o is made
+ self.throttle()
+ try:
+ r = requests.post(
+ url,
+ data=dumps(payload),
+ headers=headers,
+ verify=self.verify_certificate,
+ timeout=self.request_timeout,
+ )
+
+ try:
+ response = loads(r.content)
+
+ except (AttributeError, TypeError, ValueError):
+ # This gets thrown if we can't parse our JSON Response
+ # - ValueError = r.content is Unparsable
+ # - TypeError = r.content is None
+ # - AttributeError = r is None
+ self.logger.warning("Invalid response from SendPulse server.")
+ self.logger.debug(
+ "Response Details:\r\n{}".format(r.content))
+ return (False, {})
+
+ # Reference status code
+ status_code = r.status_code
+
+ # Key likely expired, we'll reset it and try one more time
+ if status_code == requests.codes.unauthorized \
+ and retry and self.login():
+ return self._fetch(url, payload, target, retry=retry - 1)
+
+ if status_code not in (
+ requests.codes.ok, requests.codes.accepted):
+ # We had a problem
+ status_str = \
+ NotifySendPulse.http_response_code_lookup(
+ status_code)
+
+ if target:
+ self.logger.warning(
+ "Failed to send SendPulse notification to {}: "
+ "{}{}error={}.".format(
+ target,
+ status_str,
+ ", " if status_str else "",
+ status_code))
+ else:
+ self.logger.warning(
+ "SendPulse Authentication Request failed: "
+ "{}{}error={}.".format(
+ status_str,
+ ", " if status_str else "",
+ status_code))
+
+ self.logger.debug(
+ "Response Details:\r\n{}".format(r.content))
+
+ else:
+ if target:
+ self.logger.info(
+ "Sent SendPulse notification to {}.".format(target))
+ else:
+ self.logger.debug("SendPulse authentication successful")
+
+ return (True, response)
+
+ except requests.RequestException as e:
+ self.logger.warning(
+ "A Connection error occurred sending SendPulse "
+ "notification to {}.".format(target))
+ self.logger.debug("Socket Exception: {}".format(str(e)))
+
+ return (False, response)
+
+ @staticmethod
+ def parse_url(url):
+ """
+ Parses the URL and returns enough arguments that can allow
+ us to re-instantiate this object.
+
+ """
+
+ results = NotifyBase.parse_url(url, verify_host=False)
+ if not results:
+ # We're done early as we couldn't load the results
+ return results
+
+ # Define our minimum requirements; defining them now saves us from
+ # having to if/else all kinds of branches below...
+ results["from_addr"] = None
+ results["client_id"] = None
+ results["client_secret"] = None
+
+ # Prepare our targets
+ results["targets"] = []
+
+ # Our URL looks like this:
+ # {schema}://{from_addr}:{client_id}/{client_secret}/{targets}
+ #
+ # which actually equates to:
+ # {schema}://{user}@{host}/{client_id}/{client_secret}
+ # /{email1}/{email2}/etc..
+ # ^ ^
+ # | |
+ # -from addr-
+ if "from" in results["qsd"]:
+ results["from_addr"] = \
+ NotifySendPulse.unquote(results["qsd"]["from"].rstrip())
+
+ if is_email(results["from_addr"]):
+ # Our hostname is free'd up to be interpreted as part of the
+ # targets
+ results["targets"].append(
+ NotifySendPulse.unquote(results["host"]))
+ results["host"] = ""
+
+ if "user" in results["qsd"] and \
+ is_email(NotifySendPulse.unquote(results["user"])):
+ # Our hostname is free'd up to be interpreted as part of the
+ # targets
+ results["targets"].append(NotifySendPulse.unquote(results["host"]))
+ results["host"] = ""
+
+ # Get our potential email targets
+ # First 2 elements are the client_id and client_secret
+ results["targets"] += NotifySendPulse.split_path(results["fullpath"])
+ # check for our client id
+ if "id" in results["qsd"] and len(results["qsd"]["id"]):
+ # Store our Client ID
+ results["client_id"] = \
+ NotifySendPulse.unquote(results["qsd"]["id"])
+
+ elif results["targets"]:
+ # Store our Client ID
+ results["client_id"] = results["targets"].pop(0)
+
+ if "secret" in results["qsd"] and len(results["qsd"]["secret"]):
+ # Store our Client Secret
+ results["client_secret"] = \
+ NotifySendPulse.unquote(results["qsd"]["secret"])
+
+ elif results["targets"]:
+ # Store our Client Secret
+ results["client_secret"] = results["targets"].pop(0)
+
+ # The 'to' makes it easier to use yaml configuration
+ if "to" in results["qsd"] and len(results["qsd"]["to"]):
+ results["targets"].append(
+ NotifySendPulse.unquote(results["qsd"]["to"]))
+
+ # Handle Carbon Copy Addresses
+ if "cc" in results["qsd"] and len(results["qsd"]["cc"]):
+ results["cc"] = \
+ NotifySendPulse.unquote(results["qsd"]["cc"])
+
+ # Handle Blind Carbon Copy Addresses
+ if "bcc" in results["qsd"] and len(results["qsd"]["bcc"]):
+ results["bcc"] = \
+ NotifySendPulse.unquote(results["qsd"]["bcc"])
+
+ # Handle Blind Carbon Copy Addresses
+ if "template" in results["qsd"] and len(results["qsd"]["template"]):
+ results["template"] = \
+ NotifySendPulse.unquote(results["qsd"]["template"])
+
+ # Add any template substitutions
+ results["template_data"] = results["qsd+"]
+
+ return results
diff --git a/packaging/redhat/python-apprise.spec b/packaging/redhat/python-apprise.spec
index 253ad8d6..85872223 100644
--- a/packaging/redhat/python-apprise.spec
+++ b/packaging/redhat/python-apprise.spec
@@ -69,8 +69,8 @@ notification services. It supports sending alerts to platforms such as: \
`ParsePlatform`, `Plivo`, `PopcornNotify`, `Prowl`, `Pushalot`, \
`PushBullet`, `Pushjet`, `PushMe`, `Pushover`, `Pushplus`, `PushSafer`, \
`Pushy`, `PushDeer`, `QQ Push`, `Revolt`, `Reddit`, `Resend`, `Rocket.Chat`, \
-`RSyslog`, `SendGrid`, `ServerChan`, `Seven`, `SFR`, `Signal`, `SIGNL4`, \
-`SimplePush`, `Sinch`, `Slack`, `SMPP`, `SMSEagle`, `SMS Manager`, \
+`RSyslog`, `SendGrid`, `SendPulse`, `ServerChan`, `Seven`, `SFR`, `Signal`, \
+`SIGNL4`, `SimplePush`, `Sinch`, `Slack`, `SMPP`, `SMSEagle`, `SMS Manager`, \
`SMTP2Go`, `SparkPost`, `Splunk`, `Spike`, `Spug Push`, `Super Toasty`, \
`Streamlabs`, `Stride`, `Synology Chat`, `Syslog`, `Techulus Push`, \
`Telegram`, `Threema Gateway`, `Twilio`, `Twitter`, `Twist`, `Vapid`, \
diff --git a/pyproject.toml b/pyproject.toml
index 18f3a576..a710847a 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -138,6 +138,7 @@ keywords = [
"RSyslog",
"Ryver",
"SendGrid",
+ "SendPulse",
"ServerChan",
"SES",
"Seven",
diff --git a/tests/test_apprise_utils.py b/tests/test_apprise_utils.py
index e16d816b..f36b642e 100644
--- a/tests/test_apprise_utils.py
+++ b/tests/test_apprise_utils.py
@@ -2028,6 +2028,9 @@ def test_parse_emails():
assert len(results) == len(emails)
for email in emails:
assert email in results
+ is_email = utils.parse.is_email(email)
+ assert is_email
+ assert is_email.get("name")
# pass the entries in as a list
results = utils.parse.parse_emails(emails)
@@ -2115,7 +2118,8 @@ def test_parse_urls():
# Commas and spaces found inside URLs are ignored
urls = [
(
- "mailgun://noreply@sandbox.mailgun.org/apikey/?to=test@example.com,test2@example.com,,"
+ "mailgun://noreply@sandbox.mailgun.org/apikey/"
+ "?to=test@example.com,test2@example.com,,"
" abcd@example.com"
),
(
@@ -3029,7 +3033,8 @@ def test_cwe312_url():
assert (
utils.cwe312.cwe312_url(
- "slack://mybot@xoxb-43598234231-3248932482278-BZK5Wj15B9mPh1RkShJoCZ44"
+ "slack://mybot@xoxb-43598234231-3248932482278"
+ "-BZK5Wj15B9mPh1RkShJoCZ44"
"/lead2gold@gmail.com"
)
== "slack://mybot@x...4/l...m"
diff --git a/tests/test_plugin_sendpulse.py b/tests/test_plugin_sendpulse.py
new file mode 100644
index 00000000..bde529a4
--- /dev/null
+++ b/tests/test_plugin_sendpulse.py
@@ -0,0 +1,559 @@
+# BSD 2-Clause License
+#
+# Apprise - Push Notification Library.
+# Copyright (c) 2025, Chris Caron
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+from json import dumps, loads
+
+# Disable logging for a cleaner testing output
+import logging
+import os
+from unittest import mock
+
+from helpers import AppriseURLTester
+import pytest
+import requests
+
+from apprise import Apprise, AppriseAttachment, NotifyType
+from apprise.plugins.sendpulse import NotifySendPulse
+
+logging.disable(logging.CRITICAL)
+
+SENDPULSE_GOOD_RESPONSE = dumps({
+ "access_token": "abc123",
+ "expires_in": 3600,
+})
+
+SENDPULSE_BAD_RESPONSE = "{"
+
+# Attachment Directory
+TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), "var")
+
+# Our Testing URLs
+apprise_url_tests = (
+ ("sendpulse://", {
+ "instance": TypeError,
+ }),
+ ("sendpulse://:@/", {
+ "instance": TypeError,
+ }),
+ ("sendpulse://abcd", {
+ # invalid from email
+ "instance": TypeError,
+ }),
+ ("sendpulse://abcd@host.com", {
+ # Just an Email specified, no client_id or client_secret
+ "instance": TypeError,
+ }),
+ ("sendpulse://user@example.com/client_id/cs/?template=invalid", {
+ # Invalid template
+ "instance": TypeError,
+ }),
+ ("sendpulse://user@example.com/client_id/cs1/?template=123", {
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs1/", {
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs1/?format=text", {
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs1/?format=html", {
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://chris@example.com/client_id/cs1/?from=Chris", {
+ # Set name only
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://?id=ci&secret=cs&user=chris@example.com", {
+ # Set login through user= only
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://?id=ci&secret=cs&user=Chris", {
+ # Set login through user= only
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://?id=ci&secret=cs&user=chris", {
+ # Set login through user= only - invaild email
+ "instance": TypeError,
+ }),
+ ("sendpulse://example.com/client_id/cs1/?user=chris", {
+ # Set user as a name only
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://client_id/cs1/?user=chris@example.ca", {
+ # Set user as email
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://client_id/cs1/?from=Chris", {
+ # set full email with name
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://?from=Chris&id=ci&secret=cs", {
+ # leverage all get params from URL
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs1a/", {
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_BAD_RESPONSE,
+ # Notify will fail because auth failed
+ "response": False,
+ }),
+ ("sendpulse://user@example.com/client_id/cs2/"
+ "?bcc=l2g@nuxref.com", {
+ # A good email with Blind Carbon Copy
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs2/"
+ "?bcc=invalid", {
+ # A good email with Blind Carbon Copy
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs3/"
+ "?cc=l2g@nuxref.com", {
+ # A good email with Carbon Copy
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs4/"
+ "?cc=Chris", {
+ # A good email with Carbon Copy
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs5/"
+ "?cc=invalid", {
+ # A good email with Carbon Copy
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs6/"
+ "?to=invalid", {
+ # an invalid to email
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs7/chris@example.com", {
+ # An email with a designated to email
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs8/"
+ "?to=Chris", {
+ # An email with a full name in in To field
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs9/"
+ "chris@example.com/chris2@example.com/Test", {
+ # Several emails to notify
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs10/"
+ "?cc=Chris", {
+ # An email with a full name in cc
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs11/"
+ "?cc=chris@example.com", {
+ # An email with a full name in cc
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs12/"
+ "?bcc=Chris", {
+ # An email with a full name in bcc
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs13/"
+ "?bcc=chris@example.com", {
+ # An email with a full name in bcc
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs14/"
+ "?to=Chris", {
+ # An email with a full name in bcc
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs15/"
+ "?to=chris@example.com", {
+ # An email with a full name in bcc
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs16/"
+ "?template=1234&+sub=value&+sub2=value2", {
+ # A good email with a template + substitutions
+ "instance": NotifySendPulse,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+
+ # Our expected url(privacy=True) startswith() response:
+ "privacy_url": "sendpulse://user@example.com/c...d/c...6/",
+ }),
+ ("sendpulse://user@example.com/client_id/cs17/", {
+ "instance": NotifySendPulse,
+ # force a failure
+ "response": False,
+ "requests_response_code": requests.codes.internal_server_error,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs18/", {
+ "instance": NotifySendPulse,
+ # throw a bizzare code forcing us to fail to look it up
+ "response": False,
+ "requests_response_code": 999,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+ ("sendpulse://user@example.com/client_id/cs19/", {
+ "instance": NotifySendPulse,
+ # Throws a series of connection and transfer exceptions when this flag
+ # is set and tests that we gracfully handle them
+ "test_requests_exceptions": True,
+ "requests_response_text": SENDPULSE_GOOD_RESPONSE,
+ }),
+)
+
+
+def test_plugin_sendpulse_urls():
+ """
+ NotifySendPulse() Apprise URLs
+
+ """
+
+ # Run our general tests
+ AppriseURLTester(tests=apprise_url_tests).run_all()
+
+
+@mock.patch("requests.post")
+def test_plugin_sendpulse_edge_cases(mock_post):
+ """
+ NotifySendPulse() Edge Cases
+ """
+ request = mock.Mock()
+ request.status_code = requests.codes.ok
+ request.content = SENDPULSE_GOOD_RESPONSE
+
+ # Prepare Mock
+ mock_post.return_value = request
+
+ obj = Apprise.instantiate(
+ "sendpulse://user@example.com/ci/cs/Test")
+
+ assert obj.notify(
+ body="body", title="title", notify_type=NotifyType.INFO) is True
+
+ # Test our call count
+ assert mock_post.call_count == 2
+
+ # Authentication
+ assert mock_post.call_args_list[0][0][0] == \
+ "https://api.sendpulse.com/oauth/access_token"
+
+ payload = loads(mock_post.call_args_list[0][1]["data"])
+ assert payload == {
+ "grant_type": "client_credentials",
+ "client_id": "ci",
+ "client_secret": "cs",
+ }
+
+ assert mock_post.call_args_list[1][0][0] == \
+ "https://api.sendpulse.com/smtp/emails"
+ payload = loads(mock_post.call_args_list[1][1]["data"])
+
+ assert payload == {
+ "email": {
+ "from": {
+ "email": "user@example.com", "name": "Apprise"
+ },
+ "to": [{"email": "test@example.com", "name": "Test"}],
+ "subject": "title", "text": "body", "html": "Ym9keQ=="}}
+
+ mock_post.reset_mock()
+
+ obj = Apprise.instantiate("sendpulse://user@example.com/ci/cs/?from=John")
+
+ assert obj.notify(
+ body="body", title="title", notify_type=NotifyType.INFO) is True
+
+ # Test our call count
+ assert mock_post.call_count == 2
+
+ # Authentication
+ assert mock_post.call_args_list[0][0][0] == \
+ "https://api.sendpulse.com/oauth/access_token"
+
+ payload = loads(mock_post.call_args_list[0][1]["data"])
+ assert payload == {
+ "grant_type": "client_credentials",
+ "client_id": "ci",
+ "client_secret": "cs",
+ }
+
+ assert mock_post.call_args_list[1][0][0] == \
+ "https://api.sendpulse.com/smtp/emails"
+ payload = loads(mock_post.call_args_list[1][1]["data"])
+
+ assert payload == {
+ "email": {
+ "from": {
+ "email": "user@example.com", "name": "John"
+ },
+ "to": [{"email": "user@example.com", "name": "John"}],
+ "subject": "title", "text": "body", "html": "Ym9keQ=="}}
+
+ mock_post.reset_mock()
+
+ # Second call no longer needs to authenticate
+ assert obj.notify(
+ body="body", title="title", notify_type=NotifyType.INFO) is True
+
+ assert mock_post.call_count == 1
+
+ assert mock_post.call_args_list[0][0][0] == \
+ "https://api.sendpulse.com/smtp/emails"
+
+ # force an exception
+ mock_post.side_effect = requests.RequestException
+ assert obj.notify(
+ body="body", title="title", notify_type=NotifyType.INFO) is False
+
+ # Set an invalid return code
+ mock_post.side_effect = None
+ request.status_code = 403
+ assert obj.notify(
+ body="body", title="title", notify_type=NotifyType.INFO) is False
+
+ # Test re-authentication
+ mock_post.reset_mock()
+ request = mock.Mock()
+ obj = Apprise.instantiate("sendpulse://usr2@example.com/ci/cs/?from=Retry")
+
+ class sendpulse:
+ def __init__(self):
+ # 200 login okay
+ # 401 on retrival
+ # recursive re-attempt to login returns 200
+ # fetch after works
+ self._side_effect = iter([
+ requests.codes.ok, requests.codes.unauthorized,
+ requests.codes.ok, requests.codes.ok,
+ ])
+
+ @property
+ def status_code(self):
+ return next(self._side_effect)
+
+ @property
+ def content(self):
+ return SENDPULSE_GOOD_RESPONSE
+
+ mock_post.return_value = sendpulse()
+
+ assert obj.notify(
+ body="body", title="title", notify_type=NotifyType.INFO) is True
+
+ assert mock_post.call_count == 4
+ # Authentication
+ assert mock_post.call_args_list[0][0][0] == \
+ "https://api.sendpulse.com/oauth/access_token"
+ # 401 received
+ assert mock_post.call_args_list[1][0][0] == \
+ "https://api.sendpulse.com/smtp/emails"
+ # Re-authenticate
+ assert mock_post.call_args_list[2][0][0] == \
+ "https://api.sendpulse.com/oauth/access_token"
+ # Try again
+ assert mock_post.call_args_list[3][0][0] == \
+ "https://api.sendpulse.com/smtp/emails"
+
+ # Test re-authentication (no recursive loops)
+ mock_post.reset_mock()
+ request = mock.Mock()
+ obj = Apprise.instantiate("sendpulse://usr2@example.com/ci/cs/?from=Retry")
+
+ class sendpulse:
+ def __init__(self):
+ # oauth always returns okay but notify returns 401
+ # recursive re-attempt only once
+ self._side_effect = iter([
+ requests.codes.ok, requests.codes.unauthorized,
+ requests.codes.ok, requests.codes.unauthorized,
+ requests.codes.ok, requests.codes.unauthorized,
+ requests.codes.ok, requests.codes.unauthorized,
+ requests.codes.ok, requests.codes.unauthorized,
+ requests.codes.ok, requests.codes.unauthorized,
+ requests.codes.ok, requests.codes.unauthorized,
+ requests.codes.ok, requests.codes.unauthorized,
+ ])
+
+ @property
+ def status_code(self):
+ return next(self._side_effect)
+
+ @property
+ def content(self):
+ return SENDPULSE_GOOD_RESPONSE
+
+ mock_post.return_value = sendpulse()
+
+ assert obj.notify(
+ body="body", title="title", notify_type=NotifyType.INFO) is False
+
+ assert mock_post.call_count == 4
+ # Authentication
+ assert mock_post.call_args_list[0][0][0] == \
+ "https://api.sendpulse.com/oauth/access_token"
+ # 401 received
+ assert mock_post.call_args_list[1][0][0] == \
+ "https://api.sendpulse.com/smtp/emails"
+ # Re-authenticate
+ assert mock_post.call_args_list[2][0][0] == \
+ "https://api.sendpulse.com/oauth/access_token"
+ # Last failed attempt
+ assert mock_post.call_args_list[3][0][0] == \
+ "https://api.sendpulse.com/smtp/emails"
+
+ mock_post.side_effect = None
+ request = mock.Mock()
+ request.status_code = requests.codes.ok
+ request.content = SENDPULSE_GOOD_RESPONSE
+ mock_post.return_value = request
+ for expires_in in (None, -1, "garbage", 3600, 300000):
+ request.content = dumps({
+ "access_token": "abc123",
+ "expires_in": expires_in,
+ })
+
+ # Instantiate our object
+ obj = Apprise.instantiate("sendpulse://user@example.com/ci/cs/")
+
+ # Test variations of responses
+ obj.notify(
+ body="body", title="title", notify_type=NotifyType.INFO)
+
+ # expires_in is missing
+ request.content = dumps({
+ "access_token": "abc123",
+ })
+
+ # Instantiate our object
+ obj = Apprise.instantiate("sendpulse://user@example.com/ci/cs/")
+ assert obj.notify(
+ body="body", title="title", notify_type=NotifyType.INFO) is True
+
+
+def test_plugin_sendpulse_fail_cases():
+ """
+ NotifySendPulse() Fail Cases
+
+ """
+
+ # no client_id
+ with pytest.raises(TypeError):
+ NotifySendPulse(
+ client_id="abcd", client_secret=None,
+ from_addr="user@example.com")
+
+ with pytest.raises(TypeError):
+ NotifySendPulse(
+ client_id=None, client_secret="abcd123",
+ from_addr="user@example.com")
+
+ # invalid from email
+ with pytest.raises(TypeError):
+ NotifySendPulse(
+ client_id="abcd", client_secret="abcd456", from_addr="!invalid")
+
+ # no email
+ with pytest.raises(TypeError):
+ NotifySendPulse(
+ client_id="abcd", client_secret="abcd789", from_addr=None)
+
+ # Invalid To email address
+ NotifySendPulse(
+ client_id="abcd", client_secret="abcd321",
+ from_addr="user@example.com", targets="!invalid")
+
+ # Test invalid bcc/cc entries mixed with good ones
+ assert isinstance(NotifySendPulse(
+ client_id="abcd", client_secret="abcd654",
+ from_addr="l2g@example.com",
+ bcc=("abc@def.com", "!invalid"),
+ cc=("abc@test.org", "!invalid")), NotifySendPulse)
+
+
+@mock.patch("requests.post")
+def test_plugin_sendpulse_attachments(mock_post):
+ """
+ NotifySendPulse() Attachments
+
+ """
+
+ request = mock.Mock()
+ request.status_code = requests.codes.ok
+ request.content = SENDPULSE_GOOD_RESPONSE
+
+ # Prepare Mock
+ mock_post.return_value = request
+
+ path = os.path.join(TEST_VAR_DIR, "apprise-test.gif")
+ attach = AppriseAttachment(path)
+ obj = Apprise.instantiate("sendpulse://user@example.com/aaaa/bbbb")
+ assert isinstance(obj, NotifySendPulse)
+ assert obj.notify(
+ body="body", title="title", notify_type=NotifyType.INFO,
+ attach=attach) is True
+
+ mock_post.reset_mock()
+
+ # Try again in a use case where we can't access the file
+ with mock.patch("os.path.isfile", return_value=False):
+ assert obj.notify(
+ body="body", title="title", notify_type=NotifyType.INFO,
+ attach=attach) is False
+
+ # Try again in a use case where we can't access the file
+ with mock.patch("builtins.open", side_effect=OSError):
+ assert obj.notify(
+ body="body", title="title", notify_type=NotifyType.INFO,
+ attach=attach) is False