diff --git a/apprise/plugins/sendpulse.py b/apprise/plugins/sendpulse.py index 7c0f523e..821f53dd 100644 --- a/apprise/plugins/sendpulse.py +++ b/apprise/plugins/sendpulse.py @@ -1,7 +1,7 @@ # BSD 2-Clause License # # Apprise - Push Notification Library. -# Copyright (c) 2024, Chris Caron +# Copyright (c) 2025, Chris Caron # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: @@ -30,15 +30,16 @@ import base64 from email.utils import formataddr -from json import dumps +from json import dumps, loads +import re import requests from .. import exception -from ..common import NotifyFormat, NotifyType +from ..common import NotifyFormat, NotifyType, PersistentStoreMode from ..conversion import convert_between from ..locale import gettext_lazy as _ -from ..utils.parse import is_email, parse_emails, parse_list, validate_regex +from ..utils.parse import is_email, parse_emails, validate_regex from .base import NotifyBase @@ -65,6 +66,9 @@ class NotifySendPulse(NotifyBase): # The default Email API URL to use notify_email_url = "https://api.sendpulse.com/smtp/emails" + # Our OAuth Query + notify_oauth_url = "https://api.sendpulse.com/oauth/access_token" + # Support attachments attachment_support = True @@ -72,6 +76,18 @@ class NotifySendPulse(NotifyBase): # 60/300 = 0.2 request_rate_per_sec = 0.2 + # Our default is to no not use persistent storage beyond in-memory + # reference + storage_mode = PersistentStoreMode.AUTO + + # Token expiry if not detected in seconds (below is 1 hr) + token_expiry = 3600 + + # The number of seconds to grace for early token renewal + # Below states that 10 seconds bfore our token expiry, we'll + # attempt to renew it + token_expiry_edge = 10 + # Support attachments attachment_support = True @@ -80,14 +96,18 @@ class NotifySendPulse(NotifyBase): # Define object templates templates = ( - "{schema}://{from_email}/{client_id}/{client_secret}/", - "{schema}://{from_email}/{client_id}/{client_secret}/{targets}", + "{schema}://{user}@{host}/{client_secret}/", + "{schema}://{user}@{host}/{client_id}/{client_secret}/{targets}", ) # Define our template arguments template_tokens = dict(NotifyBase.template_tokens, **{ - "from_email": { - "name": _("Source Email"), + "user": { + "name": _("User Name"), + "type": "string", + }, + "host": { + "name": _("Domain"), "type": "string", "required": True, }, @@ -124,7 +144,7 @@ class NotifySendPulse(NotifyBase): "from": { "name": _("From Email"), "type": "string", - "map_to": "from_email", + "map_to": "from_addr", }, "cc": { "name": _("Carbon Copy"), @@ -155,14 +175,60 @@ class NotifySendPulse(NotifyBase): }, } - def __init__(self, from_email, client_id, client_secret, targets=None, - cc=None, bcc=None, template=None, - template_data=None, **kwargs): + def __init__(self, client_id, client_secret, from_addr=None, targets=None, + cc=None, bcc=None, template=None, template_data=None, + **kwargs): """ Initialize Notify SendPulse Object """ super().__init__(**kwargs) + # For tracking our email -> name lookups + self.names = {} + + # Temporary from_addr to work with for parsing + _from_addr = [self.app_id, ""] + + if self.user: + if self.host: + # Prepare the bases of our email + _from_addr = [_from_addr[0], "{}@{}".format( + re.split(r"[\s@]+", self.user)[0], + self.host, + )] + + else: + result = is_email(self.user) + if result: + # Prepare the bases of our email and include domain + self.host = result["domain"] + _from_addr = [ + result["name"] if result["name"] + else _from_addr[0], self.user] + + if isinstance(from_addr, str): + result = is_email(from_addr) + if result: + _from_addr = ( + result["name"] if result["name"] else _from_addr[0], + result["full_email"]) + else: + # Only update the string but use the already detected info + _from_addr[0] = from_addr + + result = is_email(_from_addr[1]) + if not result: + # Parse Source domain based on from_addr + msg = "Invalid ~From~ email specified: {}".format( + "{} <{}>".format(_from_addr[0], _from_addr[1]) + if _from_addr[0] else "{}".format(_from_addr[1])) + self.logger.warning(msg) + raise TypeError(msg) + + # Store our lookup + self.from_addr = _from_addr[1] + self.names[_from_addr[1]] = _from_addr[0] + # Client ID self.client_id = validate_regex( client_id, *self.template_tokens["client_id"]["regex"]) @@ -181,20 +247,6 @@ class NotifySendPulse(NotifyBase): self.logger.warning(msg) raise TypeError(msg) - result = is_email(from_email) - if not result: - msg = "Invalid ~From~ email specified: {}".format(from_email) - self.logger.warning(msg) - raise TypeError(msg) - - # Tracks emails to name lookups (if they exist) - self.__email_map = {} - - # Store from email address - self.from_email = result["full_email"] - self.__email_map[self.from_email] = result["name"] \ - if result["name"] else self.app_id - # Acquire Targets (To Emails) self.targets = [] @@ -204,6 +256,8 @@ class NotifySendPulse(NotifyBase): # Acquire Blind Carbon Copies self.bcc = set() + # No template + self.template = None if template: try: # Store our template @@ -225,9 +279,9 @@ class NotifySendPulse(NotifyBase): for recipient in parse_emails(targets): result = is_email(recipient) if result: - self.targets.append( - (result["name"] if result["name"] else False, - result["full_email"])) + self.targets.append(result["full_email"]) + if result["name"]: + self.names[result["full_email"]] = result["name"] continue self.logger.warning( @@ -237,16 +291,16 @@ class NotifySendPulse(NotifyBase): else: # If our target email list is empty we want to add ourselves to it - self.targets.append(self.from_email) + self.targets.append(self.from_addr) # Validate recipients (cc:) and drop bad ones: - for recipient in parse_list(cc): + for recipient in parse_emails(cc): result = is_email(recipient) if result: self.cc.add(result["full_email"]) if result["name"]: - self.__email_lookup[result["full_email"]] = result["name"] + self.names[result["full_email"]] = result["name"] continue self.logger.warning( @@ -255,13 +309,13 @@ class NotifySendPulse(NotifyBase): ) # Validate recipients (bcc:) and drop bad ones: - for recipient in parse_list(bcc): + for recipient in parse_emails(bcc): result = is_email(recipient) if result: self.bcc.add(result["full_email"]) if result["name"]: - self.__email_lookup[result["full_email"]] = result["name"] + self.names[result["full_email"]] = result["name"] continue self.logger.warning( @@ -271,7 +325,7 @@ class NotifySendPulse(NotifyBase): if len(self.targets) == 0: # Notify ourselves - self.targets.append(self.from_email) + self.targets.append(self.from_addr) return @@ -296,7 +350,7 @@ class NotifySendPulse(NotifyBase): # Handle our Carbon Copy Addresses params["cc"] = ",".join([ formataddr( - (self.__email_lookup.get(e, False), e), + (self.names.get(e, False), e), # Swap comma for it's escaped url code (if detected) since # we're using that as a delimiter charset="utf-8").replace(",", "%2C") @@ -306,7 +360,7 @@ class NotifySendPulse(NotifyBase): # Handle our Blind Carbon Copy Addresses params["bcc"] = ",".join([ formataddr( - (self.__email_lookup.get(e, False), e), + (self.names.get(e, False), e), # Swap comma for it's escaped url code (if detected) since # we're using that as a delimiter charset="utf-8").replace(",", "%2C") @@ -316,6 +370,10 @@ class NotifySendPulse(NotifyBase): # Handle our Template ID if if was specified params["template"] = self.template + # handle from= + if self.names[self.from_addr] != self.app_id: + params["from"] = self.names[self.from_addr] + # Append our template_data into our parameter list params.update( {"+{}".format(k): v for k, v in self.template_data.items()}) @@ -323,11 +381,11 @@ class NotifySendPulse(NotifyBase): # a simple boolean check as to whether we display our target emails # or not has_targets = \ - not (len(self.targets) == 1 and self.targets[0] == self.from_email) + not (len(self.targets) == 1 and self.targets[0] == self.from_addr) return "{schema}://{source}/{cid}/{secret}/{targets}?{params}".format( schema=self.secure_protocol, - source=self.from_email, + source=self.from_addr, cid=self.pprint(self.client_id, privacy, safe=""), secret=self.pprint(self.client_secret, privacy, safe=""), targets="" if not has_targets else "/".join( @@ -341,17 +399,58 @@ class NotifySendPulse(NotifyBase): """ return len(self.targets) + def login(self): + """ + Authenticates with the server to get a access_token + """ + self.store.clear("access_token") + payload = { + "grant_type": "client_credentials", + "client_id": self.client_id, + "client_secret": self.client_secret, + } + + success, response = self._fetch(self.notify_oauth_url, payload) + if not success: + return False + + access_token = response.get("access_token") + + # If we get here, we're authenticated + try: + expires = \ + int(response.get("expires_in")) - self.token_expiry_edge + if expires <= self.token_expiry_edge: + self.logger.error( + "SendPulse token expiry limit returned was invalid") + return False + + elif expires > self.token_expiry: + self.logger.warning( + "SendPulse token expiry limit fixed to: {}s" + .format(self.token_expiry)) + expires = self.token_expiry - self.token_expiry_edge + + except (AttributeError, TypeError, ValueError): + # expires_in was not an integer + self.logger.warning( + "SendPulse token expiry limit presumed to be: {}s".format( + self.token_expiry)) + expires = self.token_expiry - self.token_expiry_edge + + self.store.set("access_token", access_token, expires=expires) + + return access_token + def send(self, body, title="", notify_type=NotifyType.INFO, attach=None, **kwargs): """ Perform SendPulse Notification """ - headers = { - "User-Agent": self.app_id, - "Content-Type": "application/json", - "Authorization": "Bearer {}".format(self.apikey), - } + access_token = self.store.get("access_token") or self.login() + if not access_token: + return False # error tracking (used for function return) has_error = False @@ -360,8 +459,8 @@ class NotifySendPulse(NotifyBase): _payload = { "email": { "from": { - "name": self.from_email[0], - "email": self.from_email[1], + "name": self.names[self.from_addr], + "email": self.from_addr, }, # To is populated further on "to": [], @@ -445,8 +544,8 @@ class NotifySendPulse(NotifyBase): to = { "email": target } - if target in self.__email_lookup: - to["name"] = self.__email_lookup[target] + if target in self.names: + to["name"] = self.names[target] # Set our target payload["email"]["to"] = [to] @@ -457,8 +556,8 @@ class NotifySendPulse(NotifyBase): item = { "email": email, } - if email in self.__email_lookup: - item["name"] = self.__email_lookup[email] + if email in self.names: + item["name"] = self.names[email] payload["email"]["cc"].append(item) @@ -468,63 +567,119 @@ class NotifySendPulse(NotifyBase): item = { "email": email, } - if email in self.__email_lookup: - item["name"] = self.__email_lookup[email] + if email in self.names: + item["name"] = self.names[email] payload["email"]["bcc"].append(item) - self.logger.debug("SendPulse POST URL: %s (cert_verify=%r)", - self.notify_email_url, self.verify_certificate, + # Perform our post + success, response = self._fetch( + self.notify_email_url, payload, target, retry=1) + if not success: + has_error = True + continue + + return not has_error + + def _fetch(self, url, payload, target=None, retry=0): + """ + Wrapper to request.post() to manage it's response better and make + the send() function cleaner and easier to maintain. + + This function returns True if the _post was successful and False + if it wasn't. + """ + headers = { + "User-Agent": self.app_id, + "Content-Type": "application/json", + } + + access_token = self.store.get("access_token") + if access_token: + headers.update({"Authorization": f"Bearer {access_token}"}) + + self.logger.debug("SendPulse POST URL: {} (cert_verify={!r})".format( + url, self.verify_certificate, + )) + self.logger.debug("SendPulse Payload: {}".format(str(payload))) + + # Prepare our default response + response = {} + + # Always call throttle before any remote server i/o is made + self.throttle() + try: + r = requests.post( + url, + data=dumps(payload), + headers=headers, + verify=self.verify_certificate, + timeout=self.request_timeout, ) - self.logger.debug("SendPulse Payload: %s", str(payload)) - # Always call throttle before any remote server i/o is made - self.throttle() try: - r = requests.post( - self.notify_email_url, - data=dumps(payload), - headers=headers, - verify=self.verify_certificate, - timeout=self.request_timeout, - ) - if r.status_code not in ( - requests.codes.ok, requests.codes.accepted): - # We had a problem - status_str = \ - NotifySendPulse.http_response_code_lookup( - r.status_code) + response = loads(r.content) + except (AttributeError, TypeError, ValueError): + # This gets thrown if we can't parse our JSON Response + # - ValueError = r.content is Unparsable + # - TypeError = r.content is None + # - AttributeError = r is None + self.logger.warning("Invalid response from SendPulse server.") + self.logger.debug( + "Response Details:\r\n{}".format(r.content)) + return (False, {}) + + # Reference status code + status_code = r.status_code + + # Key likely expired, we'll reset it and try one more time + if status_code == requests.codes.unauthorized \ + and retry and self.login(): + return self._fetch(url, payload, target, retry=retry - 1) + + if status_code not in ( + requests.codes.ok, requests.codes.accepted): + # We had a problem + status_str = \ + NotifySendPulse.http_response_code_lookup( + status_code) + + if target: self.logger.warning( "Failed to send SendPulse notification to {}: " "{}{}error={}.".format( target, status_str, ", " if status_str else "", - r.status_code)) - - self.logger.debug( - "Response Details:\r\n{}".format(r.content)) - - # Mark our failure - has_error = True - continue - + status_code)) else: + self.logger.warning( + "SendPulse Authentication Request failed: " + "{}{}error={}.".format( + status_str, + ", " if status_str else "", + status_code)) + + self.logger.debug( + "Response Details:\r\n{}".format(r.content)) + + else: + if target: self.logger.info( "Sent SendPulse notification to {}.".format(target)) + else: + self.logger.debug("SendPulse authentication successful") - except requests.RequestException as e: - self.logger.warning( - "A Connection error occurred sending SendPulse " - "notification to {}.".format(target)) - self.logger.debug("Socket Exception: %s", str(e)) + return (True, response) - # Mark our failure - has_error = True - continue + except requests.RequestException as e: + self.logger.warning( + "A Connection error occurred sending SendPulse " + "notification to {}.".format(target)) + self.logger.debug("Socket Exception: {}".format(str(e))) - return not has_error + return (False, response) @staticmethod def parse_url(url): @@ -534,13 +689,22 @@ class NotifySendPulse(NotifyBase): """ - results = NotifyBase.parse_url(url) + results = NotifyBase.parse_url(url, verify_host=False) if not results: # We're done early as we couldn't load the results return results + # Define our minimum requirements; defining them now saves us from + # having to if/else all kinds of branches below... + results["from_addr"] = None + results["client_id"] = None + results["client_secret"] = None + + # Prepare our targets + results["targets"] = [] + # Our URL looks like this: - # {schema}://{from_email}:{client_id}/{client_secret}/{targets} + # {schema}://{from_addr}:{client_id}/{client_secret}/{targets} # # which actually equates to: # {schema}://{user}@{host}/{client_id}/{client_secret} @@ -548,10 +712,27 @@ class NotifySendPulse(NotifyBase): # ^ ^ # | | # -from addr- + if "from" in results["qsd"]: + results["from_addr"] = \ + NotifySendPulse.unquote(results["qsd"]["from"].rstrip()) + + if is_email(results["from_addr"]): + # Our hostname is free'd up to be interpreted as part of the + # targets + results["targets"].append( + NotifySendPulse.unquote(results["host"])) + results["host"] = "" + + if "user" in results["qsd"] and \ + is_email(NotifySendPulse.unquote(results["user"])): + # Our hostname is free'd up to be interpreted as part of the + # targets + results["targets"].append(NotifySendPulse.unquote(results["host"])) + results["host"] = "" # Get our potential email targets # First 2 elements are the client_id and client_secret - results["targets"] = NotifySendPulse.split_path(results["fullpath"]) + results["targets"] += NotifySendPulse.split_path(results["fullpath"]) # check for our client id if "id" in results["qsd"] and len(results["qsd"]["id"]): # Store our Client ID @@ -562,9 +743,6 @@ class NotifySendPulse(NotifyBase): # Store our Client ID results["client_id"] = results["targets"].pop(0) - else: # Not defined - results["client_id"] = None - if "secret" in results["qsd"] and len(results["qsd"]["secret"]): # Store our Client Secret results["client_secret"] = \ @@ -574,47 +752,20 @@ class NotifySendPulse(NotifyBase): # Store our Client Secret results["client_secret"] = results["targets"].pop(0) - else: # Not defined - results["client_secret"] = None - - if "from" in results["qsd"] and len(results["qsd"]["from"]): - results["from_email"] = \ - NotifySendPulse.unquote(results["qsd"]["from_email"]) - - # This means any user@host is the To Address if defined - if results.get("user") and results.get("host"): - results["targets"] += "{}@{}".format( - NotifySendPulse.unquote( - results["password"] - if results["password"] else results["user"]), - NotifySendPulse.unquote(results["host"]), - ) - - elif results.get("user") and results.get("host"): - results["from_email"] = "{}@{}".format( - NotifySendPulse.unquote( - results["password"] - if results["password"] else results["user"]), - NotifySendPulse.unquote(results["host"]), - ) - - else: # Not defined - results["from_email"] = None - # The 'to' makes it easier to use yaml configuration if "to" in results["qsd"] and len(results["qsd"]["to"]): - results["targets"] += \ - NotifySendPulse.parse_list(results["qsd"]["to"]) + results["targets"].append( + NotifySendPulse.unquote(results["qsd"]["to"])) # Handle Carbon Copy Addresses if "cc" in results["qsd"] and len(results["qsd"]["cc"]): results["cc"] = \ - NotifySendPulse.parse_list(results["qsd"]["cc"]) + NotifySendPulse.unquote(results["qsd"]["cc"]) # Handle Blind Carbon Copy Addresses if "bcc" in results["qsd"] and len(results["qsd"]["bcc"]): results["bcc"] = \ - NotifySendPulse.parse_list(results["qsd"]["bcc"]) + NotifySendPulse.unquote(results["qsd"]["bcc"]) # Handle Blind Carbon Copy Addresses if "template" in results["qsd"] and len(results["qsd"]["template"]): diff --git a/tests/test_apprise_utils.py b/tests/test_apprise_utils.py index e16d816b..bef3eda6 100644 --- a/tests/test_apprise_utils.py +++ b/tests/test_apprise_utils.py @@ -2028,6 +2028,9 @@ def test_parse_emails(): assert len(results) == len(emails) for email in emails: assert email in results + is_email = utils.parse.is_email(email) + assert is_email + assert is_email.get('name') # pass the entries in as a list results = utils.parse.parse_emails(emails) @@ -2115,7 +2118,8 @@ def test_parse_urls(): # Commas and spaces found inside URLs are ignored urls = [ ( - "mailgun://noreply@sandbox.mailgun.org/apikey/?to=test@example.com,test2@example.com,," + "mailgun://noreply@sandbox.mailgun.org/apikey/" + "?to=test@example.com,test2@example.com,," " abcd@example.com" ), ( @@ -3029,7 +3033,8 @@ def test_cwe312_url(): assert ( utils.cwe312.cwe312_url( - "slack://mybot@xoxb-43598234231-3248932482278-BZK5Wj15B9mPh1RkShJoCZ44" + "slack://mybot@xoxb-43598234231-3248932482278" + "-BZK5Wj15B9mPh1RkShJoCZ44" "/lead2gold@gmail.com" ) == "slack://mybot@x...4/l...m" diff --git a/tests/test_plugin_sendpulse.py b/tests/test_plugin_sendpulse.py index 4a28db2e..bde529a4 100644 --- a/tests/test_plugin_sendpulse.py +++ b/tests/test_plugin_sendpulse.py @@ -1,7 +1,7 @@ # BSD 2-Clause License # # Apprise - Push Notification Library. -# Copyright (c) 2024, Chris Caron +# Copyright (c) 2025, Chris Caron # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: @@ -25,6 +25,8 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +from json import dumps, loads + # Disable logging for a cleaner testing output import logging import os @@ -39,73 +41,215 @@ from apprise.plugins.sendpulse import NotifySendPulse logging.disable(logging.CRITICAL) +SENDPULSE_GOOD_RESPONSE = dumps({ + "access_token": "abc123", + "expires_in": 3600, +}) + +SENDPULSE_BAD_RESPONSE = "{" + # Attachment Directory TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), "var") # Our Testing URLs apprise_url_tests = ( ("sendpulse://", { - "instance": None, + "instance": TypeError, }), ("sendpulse://:@/", { - "instance": None, + "instance": TypeError, }), ("sendpulse://abcd", { - # Just an broken email (no email, client_id or secret) - "instance": None, + # invalid from email + "instance": TypeError, }), - ("sendpulse://abcd@host", { + ("sendpulse://abcd@host.com", { # Just an Email specified, no client_id or client_secret - "instance": None, + "instance": TypeError, }), - ("sendpulse://user@example.com/client_id/client_secret/", { + ("sendpulse://user@example.com/client_id/cs/?template=invalid", { + # Invalid template + "instance": TypeError, + }), + ("sendpulse://user@example.com/client_id/cs1/?template=123", { "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, }), - ("sendpulse://user@example.com/client_id/client_secret/" + ("sendpulse://user@example.com/client_id/cs1/", { + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://user@example.com/client_id/cs1/?format=text", { + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://user@example.com/client_id/cs1/?format=html", { + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://chris@example.com/client_id/cs1/?from=Chris", { + # Set name only + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://?id=ci&secret=cs&user=chris@example.com", { + # Set login through user= only + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://?id=ci&secret=cs&user=Chris", { + # Set login through user= only + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://?id=ci&secret=cs&user=chris", { + # Set login through user= only - invaild email + "instance": TypeError, + }), + ("sendpulse://example.com/client_id/cs1/?user=chris", { + # Set user as a name only + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://client_id/cs1/?user=chris@example.ca", { + # Set user as email + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://client_id/cs1/?from=Chris", { + # set full email with name + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://?from=Chris&id=ci&secret=cs", { + # leverage all get params from URL + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://user@example.com/client_id/cs1a/", { + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_BAD_RESPONSE, + # Notify will fail because auth failed + "response": False, + }), + ("sendpulse://user@example.com/client_id/cs2/" "?bcc=l2g@nuxref.com", { # A good email with Blind Carbon Copy "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, }), - ("sendpulse://user@example.com/client_id/client_secret/" + ("sendpulse://user@example.com/client_id/cs2/" + "?bcc=invalid", { + # A good email with Blind Carbon Copy + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://user@example.com/client_id/cs3/" "?cc=l2g@nuxref.com", { # A good email with Carbon Copy "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, }), - ("sendpulse://user@example.com/client_id/client_secret/" - "?to=l2g@nuxref.com", { + ("sendpulse://user@example.com/client_id/cs4/" + "?cc=Chris", { # A good email with Carbon Copy "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, }), - ("sendpulse://user@example.com/client_id/client_secret/" - "?template=1234", { - # A good email with a template + no substitutions + ("sendpulse://user@example.com/client_id/cs5/" + "?cc=invalid", { + # A good email with Carbon Copy "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, }), - ("sendpulse://user@example.com/client_id/client_secret/" + ("sendpulse://user@example.com/client_id/cs6/" + "?to=invalid", { + # an invalid to email + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://user@example.com/client_id/cs7/chris@example.com", { + # An email with a designated to email + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://user@example.com/client_id/cs8/" + "?to=Chris", { + # An email with a full name in in To field + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://user@example.com/client_id/cs9/" + "chris@example.com/chris2@example.com/Test", { + # Several emails to notify + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://user@example.com/client_id/cs10/" + "?cc=Chris", { + # An email with a full name in cc + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://user@example.com/client_id/cs11/" + "?cc=chris@example.com", { + # An email with a full name in cc + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://user@example.com/client_id/cs12/" + "?bcc=Chris", { + # An email with a full name in bcc + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://user@example.com/client_id/cs13/" + "?bcc=chris@example.com", { + # An email with a full name in bcc + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://user@example.com/client_id/cs14/" + "?to=Chris", { + # An email with a full name in bcc + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://user@example.com/client_id/cs15/" + "?to=chris@example.com", { + # An email with a full name in bcc + "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, + }), + ("sendpulse://user@example.com/client_id/cs16/" "?template=1234&+sub=value&+sub2=value2", { # A good email with a template + substitutions "instance": NotifySendPulse, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, # Our expected url(privacy=True) startswith() response: - "privacy_url": "sendpulse://a...d:user@example.com/", + "privacy_url": "sendpulse://user@example.com/c...d/c...6/", }), - ("sendpulse://user@example.com/client_id/client_secret/", { + ("sendpulse://user@example.com/client_id/cs17/", { "instance": NotifySendPulse, # force a failure "response": False, "requests_response_code": requests.codes.internal_server_error, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, }), - ("sendpulse://user@example.com/client_id/client_secret/", { + ("sendpulse://user@example.com/client_id/cs18/", { "instance": NotifySendPulse, # throw a bizzare code forcing us to fail to look it up "response": False, "requests_response_code": 999, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, }), - ("sendpulse://user@example.com/client_id/client_secret/", { + ("sendpulse://user@example.com/client_id/cs19/", { "instance": NotifySendPulse, # Throws a series of connection and transfer exceptions when this flag # is set and tests that we gracfully handle them "test_requests_exceptions": True, + "requests_response_text": SENDPULSE_GOOD_RESPONSE, }), ) @@ -120,11 +264,227 @@ def test_plugin_sendpulse_urls(): AppriseURLTester(tests=apprise_url_tests).run_all() -@mock.patch("requests.get") @mock.patch("requests.post") -def test_plugin_sendpulse_edge_cases(mock_post, mock_get): +def test_plugin_sendpulse_edge_cases(mock_post): """ NotifySendPulse() Edge Cases + """ + request = mock.Mock() + request.status_code = requests.codes.ok + request.content = SENDPULSE_GOOD_RESPONSE + + # Prepare Mock + mock_post.return_value = request + + obj = Apprise.instantiate( + "sendpulse://user@example.com/ci/cs/Test") + + assert obj.notify( + body="body", title="title", notify_type=NotifyType.INFO) is True + + # Test our call count + assert mock_post.call_count == 2 + + # Authentication + assert mock_post.call_args_list[0][0][0] == \ + "https://api.sendpulse.com/oauth/access_token" + + payload = loads(mock_post.call_args_list[0][1]["data"]) + assert payload == { + "grant_type": "client_credentials", + "client_id": "ci", + "client_secret": "cs", + } + + assert mock_post.call_args_list[1][0][0] == \ + "https://api.sendpulse.com/smtp/emails" + payload = loads(mock_post.call_args_list[1][1]["data"]) + + assert payload == { + "email": { + "from": { + "email": "user@example.com", "name": "Apprise" + }, + "to": [{"email": "test@example.com", "name": "Test"}], + "subject": "title", "text": "body", "html": "Ym9keQ=="}} + + mock_post.reset_mock() + + obj = Apprise.instantiate("sendpulse://user@example.com/ci/cs/?from=John") + + assert obj.notify( + body="body", title="title", notify_type=NotifyType.INFO) is True + + # Test our call count + assert mock_post.call_count == 2 + + # Authentication + assert mock_post.call_args_list[0][0][0] == \ + "https://api.sendpulse.com/oauth/access_token" + + payload = loads(mock_post.call_args_list[0][1]["data"]) + assert payload == { + "grant_type": "client_credentials", + "client_id": "ci", + "client_secret": "cs", + } + + assert mock_post.call_args_list[1][0][0] == \ + "https://api.sendpulse.com/smtp/emails" + payload = loads(mock_post.call_args_list[1][1]["data"]) + + assert payload == { + "email": { + "from": { + "email": "user@example.com", "name": "John" + }, + "to": [{"email": "user@example.com", "name": "John"}], + "subject": "title", "text": "body", "html": "Ym9keQ=="}} + + mock_post.reset_mock() + + # Second call no longer needs to authenticate + assert obj.notify( + body="body", title="title", notify_type=NotifyType.INFO) is True + + assert mock_post.call_count == 1 + + assert mock_post.call_args_list[0][0][0] == \ + "https://api.sendpulse.com/smtp/emails" + + # force an exception + mock_post.side_effect = requests.RequestException + assert obj.notify( + body="body", title="title", notify_type=NotifyType.INFO) is False + + # Set an invalid return code + mock_post.side_effect = None + request.status_code = 403 + assert obj.notify( + body="body", title="title", notify_type=NotifyType.INFO) is False + + # Test re-authentication + mock_post.reset_mock() + request = mock.Mock() + obj = Apprise.instantiate("sendpulse://usr2@example.com/ci/cs/?from=Retry") + + class sendpulse: + def __init__(self): + # 200 login okay + # 401 on retrival + # recursive re-attempt to login returns 200 + # fetch after works + self._side_effect = iter([ + requests.codes.ok, requests.codes.unauthorized, + requests.codes.ok, requests.codes.ok, + ]) + + @property + def status_code(self): + return next(self._side_effect) + + @property + def content(self): + return SENDPULSE_GOOD_RESPONSE + + mock_post.return_value = sendpulse() + + assert obj.notify( + body="body", title="title", notify_type=NotifyType.INFO) is True + + assert mock_post.call_count == 4 + # Authentication + assert mock_post.call_args_list[0][0][0] == \ + "https://api.sendpulse.com/oauth/access_token" + # 401 received + assert mock_post.call_args_list[1][0][0] == \ + "https://api.sendpulse.com/smtp/emails" + # Re-authenticate + assert mock_post.call_args_list[2][0][0] == \ + "https://api.sendpulse.com/oauth/access_token" + # Try again + assert mock_post.call_args_list[3][0][0] == \ + "https://api.sendpulse.com/smtp/emails" + + # Test re-authentication (no recursive loops) + mock_post.reset_mock() + request = mock.Mock() + obj = Apprise.instantiate("sendpulse://usr2@example.com/ci/cs/?from=Retry") + + class sendpulse: + def __init__(self): + # oauth always returns okay but notify returns 401 + # recursive re-attempt only once + self._side_effect = iter([ + requests.codes.ok, requests.codes.unauthorized, + requests.codes.ok, requests.codes.unauthorized, + requests.codes.ok, requests.codes.unauthorized, + requests.codes.ok, requests.codes.unauthorized, + requests.codes.ok, requests.codes.unauthorized, + requests.codes.ok, requests.codes.unauthorized, + requests.codes.ok, requests.codes.unauthorized, + requests.codes.ok, requests.codes.unauthorized, + ]) + + @property + def status_code(self): + return next(self._side_effect) + + @property + def content(self): + return SENDPULSE_GOOD_RESPONSE + + mock_post.return_value = sendpulse() + + assert obj.notify( + body="body", title="title", notify_type=NotifyType.INFO) is False + + assert mock_post.call_count == 4 + # Authentication + assert mock_post.call_args_list[0][0][0] == \ + "https://api.sendpulse.com/oauth/access_token" + # 401 received + assert mock_post.call_args_list[1][0][0] == \ + "https://api.sendpulse.com/smtp/emails" + # Re-authenticate + assert mock_post.call_args_list[2][0][0] == \ + "https://api.sendpulse.com/oauth/access_token" + # Last failed attempt + assert mock_post.call_args_list[3][0][0] == \ + "https://api.sendpulse.com/smtp/emails" + + mock_post.side_effect = None + request = mock.Mock() + request.status_code = requests.codes.ok + request.content = SENDPULSE_GOOD_RESPONSE + mock_post.return_value = request + for expires_in in (None, -1, "garbage", 3600, 300000): + request.content = dumps({ + "access_token": "abc123", + "expires_in": expires_in, + }) + + # Instantiate our object + obj = Apprise.instantiate("sendpulse://user@example.com/ci/cs/") + + # Test variations of responses + obj.notify( + body="body", title="title", notify_type=NotifyType.INFO) + + # expires_in is missing + request.content = dumps({ + "access_token": "abc123", + }) + + # Instantiate our object + obj = Apprise.instantiate("sendpulse://user@example.com/ci/cs/") + assert obj.notify( + body="body", title="title", notify_type=NotifyType.INFO) is True + + +def test_plugin_sendpulse_fail_cases(): + """ + NotifySendPulse() Fail Cases """ @@ -132,39 +492,38 @@ def test_plugin_sendpulse_edge_cases(mock_post, mock_get): with pytest.raises(TypeError): NotifySendPulse( client_id="abcd", client_secret=None, - from_email="user@example.com") + from_addr="user@example.com") with pytest.raises(TypeError): NotifySendPulse( - client_id=None, client_secret="abcd", - from_email="user@example.com") + client_id=None, client_secret="abcd123", + from_addr="user@example.com") # invalid from email with pytest.raises(TypeError): NotifySendPulse( - client_id="abcd", client_secret="abcd", from_email="!invalid") + client_id="abcd", client_secret="abcd456", from_addr="!invalid") # no email with pytest.raises(TypeError): NotifySendPulse( - client_id="abcd", client_secret="abcd", from_email=None) + client_id="abcd", client_secret="abcd789", from_addr=None) # Invalid To email address NotifySendPulse( - client_id="abcd", client_secret="abcd", - from_email="user@example.com", targets="!invalid") + client_id="abcd", client_secret="abcd321", + from_addr="user@example.com", targets="!invalid") # Test invalid bcc/cc entries mixed with good ones assert isinstance(NotifySendPulse( - client_id="abcd", client_secret="abcd", - from_email="l2g@example.com", + client_id="abcd", client_secret="abcd654", + from_addr="l2g@example.com", bcc=("abc@def.com", "!invalid"), cc=("abc@test.org", "!invalid")), NotifySendPulse) -@mock.patch("requests.get") @mock.patch("requests.post") -def test_plugin_sendpulse_attachments(mock_post, mock_get): +def test_plugin_sendpulse_attachments(mock_post): """ NotifySendPulse() Attachments @@ -172,21 +531,20 @@ def test_plugin_sendpulse_attachments(mock_post, mock_get): request = mock.Mock() request.status_code = requests.codes.ok + request.content = SENDPULSE_GOOD_RESPONSE # Prepare Mock mock_post.return_value = request - mock_get.return_value = request path = os.path.join(TEST_VAR_DIR, "apprise-test.gif") attach = AppriseAttachment(path) - obj = Apprise.instantiate("sendpulse://user@example.com/abcd/abcd") + obj = Apprise.instantiate("sendpulse://user@example.com/aaaa/bbbb") assert isinstance(obj, NotifySendPulse) assert obj.notify( body="body", title="title", notify_type=NotifyType.INFO, attach=attach) is True mock_post.reset_mock() - mock_get.reset_mock() # Try again in a use case where we can't access the file with mock.patch("os.path.isfile", return_value=False):