From 4ecc90a94ab01a2ae91916ce09015071f31594ee Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Fri, 21 Feb 2025 17:39:17 -0500 Subject: [PATCH] Home Assistant Service Notification Support Added --- README.md | 2 +- apprise/plugins/home_assistant.py | 479 ++++++++++++++++++++--------- apprise/utils/parse.py | 90 +++++- tests/test_apprise_utils.py | 110 +++++++ tests/test_plugin_homeassistant.py | 408 ++++++++++++++++-------- 5 files changed, 815 insertions(+), 274 deletions(-) diff --git a/README.md b/README.md index d6a96523..bdd0728d 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,7 @@ The table below identifies the services this tool supports and some example serv | [Gotify](https://github.com/caronc/apprise/wiki/Notify_gotify) | gotify:// or gotifys:// | (TCP) 80 or 443 | gotify://hostname/token
gotifys://hostname/token?priority=high | [Growl](https://github.com/caronc/apprise/wiki/Notify_growl) | growl:// | (UDP) 23053 | growl://hostname
growl://hostname:portno
growl://password@hostname
growl://password@hostname:port
**Note**: you can also use the get parameter _version_ which can allow the growl request to behave using the older v1.x protocol. An example would look like: growl://hostname?version=1 | [Guilded](https://github.com/caronc/apprise/wiki/Notify_guilded) | guilded:// | (TCP) 443 | guilded://webhook_id/webhook_token
guilded://avatar@webhook_id/webhook_token -| [Home Assistant](https://github.com/caronc/apprise/wiki/Notify_homeassistant) | hassio:// or hassios:// | (TCP) 8123 or 443 | hassio://hostname/accesstoken
hassio://user@hostname/accesstoken
hassio://user:password@hostname:port/accesstoken
hassio://hostname/optional/path/accesstoken +| [Home Assistant](https://github.com/caronc/apprise/wiki/Notify_homeassistant) | hassio:// or hassios:// | (TCP) 8123 or 443 | hassio://hostname/long-lived-token
hassio://user@hostname/long-lived-token
hassio://user:password@hostname:port/long-lived-token
hassio://hostname/optional/path/long-lived-token
hassio://user@hostname/long-lived-token/service
hassio://user@hostname/long-lived-token/Service1/Service2/ServiceN | [IFTTT](https://github.com/caronc/apprise/wiki/Notify_ifttt) | ifttt:// | (TCP) 443 | ifttt://webhooksID/Event
ifttt://webhooksID/Event1/Event2/EventN
ifttt://webhooksID/Event1/?+Key=Value
ifttt://webhooksID/Event1/?-Key=value1 | [Join](https://github.com/caronc/apprise/wiki/Notify_join) | join:// | (TCP) 443 | join://apikey/device
join://apikey/device1/device2/deviceN/
join://apikey/group
join://apikey/groupA/groupB/groupN
join://apikey/DeviceA/groupA/groupN/DeviceN/ | [KODI](https://github.com/caronc/apprise/wiki/Notify_kodi) | kodi:// or kodis:// | (TCP) 8080 or 443 | kodi://hostname
kodi://user@hostname
kodi://user:password@hostname:port diff --git a/apprise/plugins/home_assistant.py b/apprise/plugins/home_assistant.py index 51868e5f..6cd0bb31 100644 --- a/apprise/plugins/home_assistant.py +++ b/apprise/plugins/home_assistant.py @@ -27,8 +27,10 @@ # You must generate a "Long-Lived Access Token". This can be done from your # Home Assistant Profile page. - +from itertools import chain from json import dumps +import math +import re from uuid import uuid4 import requests @@ -36,12 +38,40 @@ import requests from ..common import NotifyType from ..locale import gettext_lazy as _ from ..url import PrivacyMode -from ..utils.parse import validate_regex +from ..utils.parse import ( + is_domain_service_target, + parse_bool, + parse_domain_service_targets, + validate_regex, +) from .base import NotifyBase +# This regex matches exactly 8 hex digits, +# a dot, then exactly 64 hex digits. it can also be a JWT +# token in which case it will be 180 characters+ +RE_IS_LONG_LIVED_TOKEN = re.compile( + r"^([0-9a-f]{8}\.[0-9a-f]{64}|[a-z0-9_-]+\.[a-z0-9_-]+\.[a-z0-9_-]+)$", + re.I) + +# Define our supported device notification formats: +# - service +# - default domain is always 'notify' if one isn't detected +# - service:target +# - service:target1,target2,target3 +# - domain.service +# - domain.service:target +# - domain.service:target1,target2,target3 +# - - targets can be comma/space separated if more hten one +# - service:target1,target2,target3 + +# Define a persistent entry (used for handling message delivery +PERSISTENT_ENTRY = (None, None, []) + class NotifyHomeAssistant(NotifyBase): - """A wrapper for Home Assistant Notifications.""" + """ + A wrapper for Home Assistant Notifications + """ # The default descriptive name associated with the Notification service_name = "HomeAssistant" @@ -58,116 +88,179 @@ class NotifyHomeAssistant(NotifyBase): # Default to Home Assistant Default Insecure port of 8123 instead of 80 default_insecure_port = 8123 + # The maximum amount of services that can be notified in a single batch + default_batch_size = 10 + + # The default ha notification domain if one isn't detected + default_domain = "notify" + # A URL that takes you to the setup/help of the specific protocol setup_url = "https://github.com/caronc/apprise/wiki/Notify_homeassistant" # Define object templates templates = ( - "{schema}://{host}/{accesstoken}", - "{schema}://{host}:{port}/{accesstoken}", - "{schema}://{user}@{host}/{accesstoken}", - "{schema}://{user}@{host}:{port}/{accesstoken}", - "{schema}://{user}:{password}@{host}/{accesstoken}", - "{schema}://{user}:{password}@{host}:{port}/{accesstoken}", + "{schema}://{host}/{token}", + "{schema}://{host}:{port}/{token}", + "{schema}://{user}@{host}/{token}", + "{schema}://{user}@{host}:{port}/{token}", + "{schema}://{user}:{password}@{host}/{token}", + "{schema}://{user}:{password}@{host}:{port}/{token}", + "{schema}://{host}/{token}/{targets}", + "{schema}://{host}:{port}/{token}/{targets}", + "{schema}://{user}@{host}/{token}/{targets}", + "{schema}://{user}@{host}:{port}/{token}/{targets}", + "{schema}://{user}:{password}@{host}/{token}/{targets}", + "{schema}://{user}:{password}@{host}:{port}/{token}/{targets}", ) # Define our template tokens - template_tokens = dict( - NotifyBase.template_tokens, - **{ - "host": { - "name": _("Hostname"), - "type": "string", - "required": True, - }, - "port": { - "name": _("Port"), - "type": "int", - "min": 1, - "max": 65535, - }, - "user": { - "name": _("Username"), - "type": "string", - }, - "password": { - "name": _("Password"), - "type": "string", - "private": True, - }, - "accesstoken": { - "name": _("Long-Lived Access Token"), - "type": "string", - "private": True, - "required": True, - }, + template_tokens = dict(NotifyBase.template_tokens, **{ + "host": { + "name": _("Hostname"), + "type": "string", + "required": True, }, - ) + "port": { + "name": _("Port"), + "type": "int", + "min": 1, + "max": 65535, + }, + "user": { + "name": _("Username"), + "type": "string", + }, + "password": { + "name": _("Password"), + "type": "string", + "private": True, + }, + "token": { + "name": _("Long-Lived Access Token"), + "type": "string", + "private": True, + "required": True, + }, + "target_device": { + "name": _("Target Device"), + "type": "string", + "map_to": "targets", + }, + "targets": { + "name": _("Targets"), + "type": "list:string", + }, + }) # Define our template arguments - template_args = dict( - NotifyBase.template_args, - **{ - "nid": { - # Optional Unique Notification ID - "name": _("Notification ID"), - "type": "string", - "regex": (r"^[a-z0-9_-]+$", "i"), - }, + template_args = dict(NotifyBase.template_args, **{ + "nid": { + # Optional Unique Notification ID + "name": _("Notification ID"), + "type": "string", + "regex": (r"^[a-z0-9_-]+$", "i"), }, - ) + "prefix": { + # Path Prefix to use (for those not hosting their hasio instance + # in /) + "name": _("Path Prefix"), + "type": "string", + }, + "batch": { + "name": _("Batch Mode"), + "type": "bool", + "default": False, + }, + "to": { + "alias_of": "targets", + }, + }) - def __init__(self, accesstoken, nid=None, **kwargs): - """Initialize Home Assistant Object.""" + def __init__(self, token, nid=None, targets=None, prefix=None, + batch=None, **kwargs): + """ + Initialize Home Assistant Object + """ super().__init__(**kwargs) - self.fullpath = kwargs.get("fullpath", "") + self.prefix = prefix or kwargs.get("fullpath", "") if not (self.secure or self.port): # Use default insecure port self.port = self.default_insecure_port # Long-Lived Access token (generated from User Profile) - self.accesstoken = validate_regex(accesstoken) - if not self.accesstoken: - msg = ( - "An invalid Home Assistant Long-Lived Access Token " - f"({accesstoken}) was specified." - ) + self.token = validate_regex(token) + if not self.token: + msg = "An invalid Home Assistant Long-Lived Access Token " \ + "({}) was specified.".format(token) self.logger.warning(msg) raise TypeError(msg) # An Optional Notification Identifier self.nid = None if nid: - self.nid = validate_regex(nid, *self.template_args["nid"]["regex"]) + self.nid = validate_regex( + nid, *self.template_args["nid"]["regex"]) if not self.nid: - msg = ( - "An invalid Home Assistant Notification Identifier " - f"({nid}) was specified." - ) + msg = "An invalid Home Assistant Notification Identifier " \ + "({}) was specified.".format(nid) self.logger.warning(msg) raise TypeError(msg) + # Prepare Batch Mode Flag + self.batch = self.template_args["batch"]["default"] \ + if batch is None else batch + + # Store our targets + self.targets = [] + + # Track our invalid targets + self._invalid_targets = [] + + if targets: + for target in parse_domain_service_targets(targets): + result = is_domain_service_target( + target, domain=self.default_domain) + if result: + self.targets.append(( + result["domain"], + result["service"], + result["targets"], + )) + continue + + self.logger.warning( + "Dropped invalid [domain.]service[:target] entry " + "({}) specified.".format(target), + ) + self._invalid_targets.append(target) + else: + self.targets = [PERSISTENT_ENTRY] + return def send(self, body, title="", notify_type=NotifyType.INFO, **kwargs): - """Sends Message.""" + """ + Sends Message + """ + + if not self.targets: + self.logger.warning( + "There are no valid Home Assistant targets to notify.") + return False # Prepare our persistent_notification.create payload payload = { "title": title, "message": body, - # Use a unique ID so we don't over-write the last message - # we posted. Otherwise use the notification id specified - "notification_id": self.nid if self.nid else str(uuid4()), } # Prepare our headers headers = { "User-Agent": self.app_id, "Content-Type": "application/json", - "Authorization": f"Bearer {self.accesstoken}", + "Authorization": "Bearer {}".format(self.token), } auth = None @@ -177,24 +270,66 @@ class NotifyHomeAssistant(NotifyBase): # Set our schema schema = "https" if self.secure else "http" - url = f"{schema}://{self.host}" + url = "{}://{}".format(schema, self.host) if isinstance(self.port, int): url += f":{self.port}" - url += ( - self.fullpath.rstrip("/") - + "/api/services/persistent_notification/create" - ) + # Determine if we're doing it the old way (using persistent notices) + # or the new (supporting device targets) + has_targets = \ + bool(not self.targets or self.targets[0] is not PERSISTENT_ENTRY) + + # our base url + base_url = url + self.prefix.rstrip("/") + \ + "/api/services/persistent_notification/create" + + # Send in batches if identified to do so + batch_size = 1 if not self.batch else self.default_batch_size + + for target in self.targets: + # Use a unique ID so we don't over-write the last message we + # posted. Otherwise use the notification id specified + if has_targets: + # Base target details + domain = target[0] + service = target[1] + + # Prepare our URL + base_url = url + self.prefix.rstrip("/") + \ + f"/api/services/{domain}/{service}" + + # Possibly prepare batches + if target[2]: + _payload = payload.copy() + for index in range(0, len(target[2]), batch_size): + _payload["targets"] = \ + target[2][index:index + batch_size] + if not self._ha_post( + base_url, _payload, headers, auth): + return False + + # We're done + return True + + if not self._ha_post(base_url, payload, headers, auth): + return False + + return True + + def _ha_post(self, url, payload, headers, auth=None): + """ + Wrapper to single upstream server post + """ + # Notification ID + payload["notification_id"] = self.nid if self.nid else str(uuid4()) self.logger.debug( - "Home Assistant POST URL:" - f" {url} (cert_verify={self.verify_certificate!r})" - ) - self.logger.debug(f"Home Assistant Payload: {payload!s}") + "Home Assistant POST URL: {} (cert_verify={!r})".format( + url, self.verify_certificate)) + self.logger.debug("Home Assistant Payload: {}".format(str(payload))) # Always call throttle before any remote server i/o is made self.throttle() - try: r = requests.post( url, @@ -206,18 +341,18 @@ class NotifyHomeAssistant(NotifyBase): ) if r.status_code != requests.codes.ok: # We had a problem - status_str = NotifyHomeAssistant.http_response_code_lookup( - r.status_code - ) + status_str = \ + NotifyHomeAssistant.http_response_code_lookup( + r.status_code) self.logger.warning( "Failed to send Home Assistant notification: " "{}{}error={}.".format( - status_str, ", " if status_str else "", r.status_code - ) - ) + status_str, + ", " if status_str else "", + r.status_code)) - self.logger.debug(f"Response Details:\r\n{r.content}") + self.logger.debug("Response Details:\r\n{}".format(r.content)) # Return; we're done return False @@ -228,9 +363,8 @@ class NotifyHomeAssistant(NotifyBase): except requests.RequestException as e: self.logger.warning( "A Connection error occurred sending Home Assistant " - f"notification to {self.host}." - ) - self.logger.debug(f"Socket Exception: {e!s}") + "notification to {}.".format(self.host)) + self.logger.debug("Socket Exception: {}".format(str(e))) # Return; we're done return False @@ -239,30 +373,34 @@ class NotifyHomeAssistant(NotifyBase): @property def url_identifier(self): - """Returns all of the identifiers that make this URL unique from - another simliar one. - - Targets or end points should never be identified here. + """ + Returns all of the identifiers that make this URL unique from + another simliar one. Targets or end points should never be identified + here. """ return ( self.secure_protocol if self.secure else self.protocol, - self.user, - self.password, - self.host, - ( - self.port - if self.port - else (443 if self.secure else self.default_insecure_port) - ), - self.fullpath.rstrip("/"), - self.accesstoken, + self.user, self.password, self.host, + self.port if self.port else ( + 443 if self.secure else self.default_insecure_port), + self.prefix.rstrip("/"), + self.token, ) def url(self, privacy=False, *args, **kwargs): - """Returns the URL built dynamically based on specified arguments.""" + """ + Returns the URL built dynamically based on specified arguments. + """ # Define any URL parameters - params = {} + params = { + "batch": "yes" if self.batch else "no", + } + + if self.prefix not in ("", "/"): + params["prefix"] = "/" if not self.prefix \ + else "/{}/".format(self.prefix.strip("/")) + if self.nid: params["nid"] = self.nid @@ -275,8 +413,7 @@ class NotifyHomeAssistant(NotifyBase): auth = "{user}:{password}@".format( user=NotifyHomeAssistant.quote(self.user, safe=""), password=self.pprint( - self.password, privacy, mode=PrivacyMode.Secret, safe="" - ), + self.password, privacy, mode=PrivacyMode.Secret, safe=""), ) elif self.user: auth = "{user}@".format( @@ -285,65 +422,123 @@ class NotifyHomeAssistant(NotifyBase): default_port = 443 if self.secure else self.default_insecure_port - url = ( - "{schema}://{auth}{hostname}{port}{fullpath}" - "{accesstoken}/?{params}" - ) + url = "{schema}://{auth}{hostname}{port}/" \ + "{token}/{targets}?{params}" + + # Determine if we're doing it the old way (using persistent notices) + # or the new (supporting device targets) + has_targets = \ + bool(not self.targets or self.targets[0] is not PERSISTENT_ENTRY) return url.format( schema=self.secure_protocol if self.secure else self.protocol, auth=auth, # never encode hostname since we're expecting it to be a valid one hostname=self.host, - port=( - "" - if not self.port or self.port == default_port - else f":{self.port}" - ), - fullpath=( - "/" - if not self.fullpath - else "/{}/".format( - NotifyHomeAssistant.quote( - self.fullpath.strip("/"), safe="/" - ) - ) - ), - accesstoken=self.pprint(self.accesstoken, privacy, safe=""), + port="" if not self.port or self.port == default_port + else ":{}".format(self.port), + token=self.pprint(self.token, privacy, safe=""), + targets="" if not has_targets else "/".join( + chain([NotifyHomeAssistant.quote("{}.{}{}".format( + x[0], x[1], "" + if not x[2] else ":" + ",".join(x[2])), safe="") + for x in self.targets], + [NotifyHomeAssistant.quote(x, safe="") + for x in self._invalid_targets])), params=NotifyHomeAssistant.urlencode(params), ) + def __len__(self): + """ + Returns the number of targets associated with this notification + """ + # + # Factor batch into calculation + # + + # Determine if we're doing it the old way (using persistent notices) + # or the new (supporting device targets) + has_targets = \ + bool(not self.targets or self.targets[0] is not PERSISTENT_ENTRY) + + if not has_targets: + return 1 + + # Handle targets + batch_size = 1 if not self.batch else self.default_batch_size + return sum( + math.ceil(len(identities) / batch_size) + if identities else 1 for _, _, identities in self.targets) + @staticmethod def parse_url(url): - """Parses the URL and returns enough arguments that can allow us to re- - instantiate this object.""" + """ + Parses the URL and returns enough arguments that can allow + us to re-instantiate this object. + + """ results = NotifyBase.parse_url(url, verify_host=False) if not results: # We're done early as we couldn't load the results return results - # Get our Long-Lived Access Token - if "accesstoken" in results["qsd"] and len( - results["qsd"]["accesstoken"] - ): - results["accesstoken"] = NotifyHomeAssistant.unquote( - results["qsd"]["accesstoken"] - ) + # Set our path to use: + if "prefix" in results["qsd"] and len(results["qsd"]["prefix"]): + results["prefix"] = \ + NotifyHomeAssistant.unquote(results["qsd"]["prefix"]) - else: - # Acquire our full path - fullpath = NotifyHomeAssistant.split_path(results["fullpath"]) + # Long Lived Access token placeholder + results["token"] = None - # Otherwise pop the last element from our path to be it - results["accesstoken"] = fullpath.pop() if fullpath else None + # Get our Long-Lived Access Token (if defined) + if "token" in results["qsd"] and \ + len(results["qsd"]["token"]): + results["token"] = \ + NotifyHomeAssistant.unquote(results["qsd"]["token"]) - # Re-assemble our full path - results["fullpath"] = "/" + "/".join(fullpath) if fullpath else "" + # Acquire our full path + tokens = NotifyHomeAssistant.split_path(results["fullpath"]) + results["targets"] = [] + + while tokens: + # Iterate through our tokens + token = tokens.pop() + if not results["token"]: + if RE_IS_LONG_LIVED_TOKEN.match(token): + # Store our access token + results["token"] = token + + # Re-assemble our full path + results["fullpath"] = "/" + "/".join(tokens) + continue + + # If we don't have an access token, then we can assume + # it's a device we're storing + results["targets"].append(token) + continue + + elif "prefix" not in results: + # Re-assemble our full path + results["fullpath"] = "/" + "/".join([*tokens, token]) + + # We're done + break + + # prefix is in the result set, so therefore we're dealing with a + # custom target/service + results["targets"].append(token) + + # Get Batch Mode Flag + results["batch"] = \ + parse_bool(results["qsd"].get( + "batch", + NotifyHomeAssistant.template_args["batch"]["default"])) # Allow the specification of a unique notification_id so that # it will always replace the last one sent. if "nid" in results["qsd"] and len(results["qsd"]["nid"]): - results["nid"] = NotifyHomeAssistant.unquote(results["qsd"]["nid"]) + results["nid"] = \ + NotifyHomeAssistant.unquote(results["qsd"]["nid"]) return results diff --git a/apprise/utils/parse.py b/apprise/utils/parse.py index c751be1f..a2462682 100644 --- a/apprise/utils/parse.py +++ b/apprise/utils/parse.py @@ -100,6 +100,15 @@ PHONE_NO_DETECTION_RE = re.compile( r"\s*([+(\s]*[0-9][0-9()\s-]+[0-9])(?=$|[\s,+(]+[0-9])", re.I ) +IS_DOMAIN_SERVICE_TARGET = re.compile( + r"\s*((?P[a-z0-9_-]+)\.)?(?P[a-z0-9_-]+)" + r"(:(?P[a-z0-9_,-]+))?", re.I) + +DOMAIN_SERVICE_TARGET_DETECTION_RE = re.compile( + r"\s*((?:[a-z0-9_-]+\.)?[a-z0-9_-]+" + r"(?::(?:[a-z0-9_-]+(?:,+[a-z0-9_-]+)+?))?)" + r"(?=$|(?:\s|,+\s|\s,+)+(?:[a-z0-9_-]+\.)?[a-z0-9_-]+)", re.I) + # Support for prefix: (string followed by colon) infront of phone no PHONE_NO_WPREFIX_DETECTION_RE = re.compile( r"\s*((?:[a-z]+:)?[+(\s]*[0-9][0-9()\s-]+[0-9])" @@ -262,6 +271,44 @@ def is_uuid(uuid): return bool(match) +def is_domain_service_target(entry, domain="notify"): + """Determine if the specified entry a domain.service:target type + + Expects a string containing the following formats: + - service + - service:target + - service:target1,target2 + - domain.service:target + - domain.service:target1,target2 + + Args: + entry (str): The string you want to check. + + Returns: + bool: Returns False if the entry specified is domain.service:target + """ + + try: + result = IS_DOMAIN_SERVICE_TARGET.match(entry) + if not result: + # not parseable content as it does not even conform closely to a + # domain.service:target + return False + + except TypeError: + # not parseable content + return False + + return { + # Store domain or set default if not acquired + "domain": result.group("domain") if result.group("domain") else domain, + # store service + "service": result.group("service"), + # store targets if defined + "targets": parse_list(result.group("targets")) + } + + def is_phone_no(phone, min_len=10): """Determine if the specified entry is a phone number. @@ -358,7 +405,7 @@ def is_call_sign(callsign): callsign (str): The string you want to check. Returns: - bool: Returns False if the address specified is not a phone number + bool: Returns False if the enry specified is not a callsign """ try: @@ -850,6 +897,47 @@ def parse_bool(arg, default=False): return bool(arg) +def parse_domain_service_targets( + *args, store_unparseable=True, domain="notify", **kwargs): + """ + Takes a string containing the following formats separated by space + - service + - service:target + - service:target1,target2 + - domain.service:target + - domain.service:target1,target2 + + If no domain is parsed, the default domain is returned. + + Targets can be comma separated (if multiple are to be defined) + """ + + result = [] + for arg in args: + if isinstance(arg, str) and arg: + _result = DOMAIN_SERVICE_TARGET_DETECTION_RE.findall(arg) + if _result: + result += _result + + elif not _result and store_unparseable: + # we had content passed into us that was lost because it was + # so poorly formatted that it didn't even come close to + # meeting the regular expression we defined. We intentially + # keep it as part of our result set so that parsing done + # at a higher level can at least report this to the end user + # and hopefully give them some indication as to what they + # may have done wrong. + result += \ + list(filter(bool, re.split(STRING_DELIMITERS, arg))) + + elif isinstance(arg, (set, list, tuple)): + # Use recursion to handle the list of phone numbers + result += parse_domain_service_targets( + *arg, store_unparseable=store_unparseable, domain=domain) + + return result + + def parse_phone_no(*args, store_unparseable=True, prefix=False, **kwargs): """Takes a string containing phone numbers separated by comma's and/or spaces and returns a list.""" diff --git a/tests/test_apprise_utils.py b/tests/test_apprise_utils.py index e16d816b..d1d5c565 100644 --- a/tests/test_apprise_utils.py +++ b/tests/test_apprise_utils.py @@ -1665,6 +1665,65 @@ def test_is_email(): assert results["user"] == "a-z0-9_!#$%&*/=?%`{|}~^.-" +def test_is_domain_service_target(): + """ + API: is_domain_service_target() function + + """ + # Invalid information + assert utils.parse.is_domain_service_target(None) is False + assert utils.parse.is_domain_service_target(42) is False + assert utils.parse.is_domain_service_target(object) is False + assert utils.parse.is_domain_service_target("") is False + assert utils.parse.is_domain_service_target("+()") is False + assert utils.parse.is_domain_service_target("+") is False + + # Valid entries + result = utils.parse.is_domain_service_target("service") + assert isinstance(result, dict) + assert result["service"] == "service" + # Default domain + assert result["domain"] == "notify" + assert isinstance(result["targets"], list) + assert len(result["targets"]) == 0 + + result = utils.parse.is_domain_service_target("domain.service") + assert isinstance(result, dict) + assert result["service"] == "service" + assert result["domain"] == "domain" + assert isinstance(result["targets"], list) + assert len(result["targets"]) == 0 + + result = utils.parse.is_domain_service_target("domain.service:target") + assert isinstance(result, dict) + assert result["service"] == "service" + assert result["domain"] == "domain" + assert isinstance(result["targets"], list) + assert len(result["targets"]) == 1 + assert result["targets"][0] == "target" + + result = utils.parse.is_domain_service_target("domain.service:t1,t2,t3") + assert isinstance(result, dict) + assert result["service"] == "service" + assert result["domain"] == "domain" + assert isinstance(result["targets"], list) + assert len(result["targets"]) == 3 + assert "t1" in result["targets"] + assert "t2" in result["targets"] + assert "t3" in result["targets"] + + result = utils.parse.is_domain_service_target( + "service:t1,t2,t3", domain="new_default") + assert isinstance(result, dict) + assert result["service"] == "service" + # Default domain + assert result["domain"] == "new_default" + assert isinstance(result["targets"], list) + assert len(result["targets"]) == 3 + assert "t1" in result["targets"] + assert "t2" in result["targets"] + assert "t3" in result["targets"] + def test_is_call_sign_no(): """ API: is_call_sign() function @@ -1858,6 +1917,57 @@ def test_parse_call_sign(): assert "DF1ABC" in results +def test_parse_domain_service_targets(): + """utils: parse_domain_service_targets() testing """ + + results = utils.parse.parse_domain_service_targets("") + assert isinstance(results, list) + assert len(results) == 0 + + results = utils.parse.parse_domain_service_targets("service1 service2") + assert isinstance(results, list) + assert len(results) == 2 + assert "service1" in results + assert "service2" in results + + results = utils.parse.parse_domain_service_targets( + "service1:target1,target2") + assert isinstance(results, list) + assert len(results) == 1 + assert "service1:target1,target2" in results + + results = utils.parse.parse_domain_service_targets( + "service1:target1,target2 service2 domain.service3") + assert isinstance(results, list) + assert len(results) == 3 + assert "service1:target1,target2" in results + assert "service2" in results + assert "domain.service3" in results + + # Support a comma in the space between entries + results = utils.parse.parse_domain_service_targets( + "service1:target1,target2, service2 ,domain.service3," + " , , service4") + assert isinstance(results, list) + assert len(results) == 4 + assert "service1:target1,target2" in results + assert "service2" in results + assert "domain.service3" in results + assert "service4" in results + + results = utils.parse.parse_domain_service_targets( + "service:target1,target2") + assert isinstance(results, list) + assert len(results) == 1 + assert "service:target1,target2" in results + + # Handle unparseables + results = utils.parse.parse_domain_service_targets( + ": %invalid ^entries%", store_unparseable=False) + assert isinstance(results, list) + assert len(results) == 0 + + def test_parse_phone_no(): """utils: parse_phone_no() testing""" # A simple single array entry (As str) diff --git a/tests/test_plugin_homeassistant.py b/tests/test_plugin_homeassistant.py index 71efded0..c19a35e4 100644 --- a/tests/test_plugin_homeassistant.py +++ b/tests/test_plugin_homeassistant.py @@ -25,6 +25,8 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +import json + # Disable logging for a cleaner testing output import logging from unittest import mock @@ -39,134 +41,110 @@ logging.disable(logging.CRITICAL) # Our Testing URLs apprise_url_tests = ( - ( - "hassio://:@/", - { - "instance": TypeError, - }, - ), - ( - "hassio://", - { - "instance": TypeError, - }, - ), - ( - "hassios://", - { - "instance": TypeError, - }, - ), + ("hassio://:@/", { + "instance": TypeError, + }), + ("hassio://", { + "instance": TypeError, + }), + ("hassios://", { + "instance": TypeError, + }), # No Long Lived Access Token specified - ( - "hassio://user@localhost", - { - "instance": TypeError, - }, - ), - ( - "hassio://localhost/long-lived-access-token", - { - "instance": NotifyHomeAssistant, - }, - ), - ( - "hassio://user:pass@localhost/long-lived-access-token/", - { - "instance": NotifyHomeAssistant, - # Our expected url(privacy=True) startswith() response: - "privacy_url": "hassio://user:****@localhost/l...n", - }, - ), - ( - "hassio://localhost:80/long-lived-access-token", - { - "instance": NotifyHomeAssistant, - }, - ), - ( - "hassio://user@localhost:8123/llat", - { - "instance": NotifyHomeAssistant, - "privacy_url": "hassio://user@localhost/l...t", - }, - ), - ( - "hassios://localhost/llat?nid=!%", - { - # Invalid notification_id - "instance": TypeError, - }, - ), - ( - "hassios://localhost/llat?nid=abcd", - { - # Valid notification_id - "instance": NotifyHomeAssistant, - }, - ), - ( - "hassios://user:pass@localhost/llat", - { - "instance": NotifyHomeAssistant, - "privacy_url": "hassios://user:****@localhost/l...t", - }, - ), - ( - "hassios://localhost:8443/path/llat/", - { - "instance": NotifyHomeAssistant, - "privacy_url": "hassios://localhost:8443/path/l...t", - }, - ), - ( - "hassio://localhost:8123/a/path?accesstoken=llat", - { - "instance": NotifyHomeAssistant, - # Default port; so it's stripped off - # accesstoken was specified as kwarg - "privacy_url": "hassio://localhost/a/path/l...t", - }, - ), - ( - "hassios://user:password@localhost:80/llat/", - { - "instance": NotifyHomeAssistant, - "privacy_url": "hassios://user:****@localhost:80", - }, - ), - ( - "hassio://user:pass@localhost:8123/llat", - { - "instance": NotifyHomeAssistant, - # force a failure - "response": False, - "requests_response_code": requests.codes.internal_server_error, - }, - ), - ( - "hassio://user:pass@localhost/llat", - { - "instance": NotifyHomeAssistant, - # throw a bizzare code forcing us to fail to look it up - "response": False, - "requests_response_code": 999, - }, - ), - ( - "hassio://user:pass@localhost/llat", - { - "instance": NotifyHomeAssistant, - # Throws a series of i/o exceptions with this flag - # is set and tests that we gracfully handle them - "test_requests_exceptions": True, - }, - ), + ("hassio://user@localhost", { + "instance": TypeError, + }), + ("hassio://localhost/long.lived.token", { + "instance": NotifyHomeAssistant, + }), + ("hassio://localhost/prefix/path/long.lived.token", { + "instance": NotifyHomeAssistant, + }), + ("hassio://localhost/long.lived.token?prefix=/ha", { + "instance": NotifyHomeAssistant, + }), + ("hassio://localhost/service/?token=long.lived.token&prefix=/ha", { + "instance": NotifyHomeAssistant, + }), + ("hassio://localhost/?token=long.lived.token&prefix=/ha&to=service", { + "instance": NotifyHomeAssistant, + }), + ("hassio://localhost/service/$%/?token=long.lived.token&prefix=/ha", { + # Tests an invalid service entry + "instance": NotifyHomeAssistant, + }), + ("hassio://localhost/%only%/%invalid%/?token=lng.lived.token&prefix=/ha", { + # Tests an invalid service entry + "instance": NotifyHomeAssistant, + # we'll have a notify response failure in this case + "notify_response": False, + }), + ("hassio://user:pass@localhost/long.lived.token/", { + "instance": NotifyHomeAssistant, + + # Our expected url(privacy=True) startswith() response: + "privacy_url": "hassio://user:****@localhost/l...n", + }), + ("hassio://localhost:80/long.lived.token", { + "instance": NotifyHomeAssistant, + }), + ("hassio://user@localhost:8123/long.lived.token", { + "instance": NotifyHomeAssistant, + "privacy_url": "hassio://user@localhost/l...n", + }), + ("hassios://localhost/long.lived.token?nid=!%", { + # Invalid notification_id + "instance": TypeError, + }), + ("hassios://localhost/long.lived.token?nid=abcd", { + # Valid notification_id + "instance": NotifyHomeAssistant, + }), + ("hassios://user:pass@localhost/long.lived.token", { + "instance": NotifyHomeAssistant, + "privacy_url": "hassios://user:****@localhost/l...n", + }), + ("hassios://localhost:8443/path/long.lived.token/", { + "instance": NotifyHomeAssistant, + "privacy_url": "hassios://localhost:8443/l...n", + }), + ("hassio://localhost:8123/a/path?token=long.lived.token", { + "instance": NotifyHomeAssistant, + # Default port; so it's stripped off + # token was specified as kwarg + "privacy_url": "hassio://localhost/l...n", + }), + ("hassios://user:password@localhost:80/long.lived.token/", { + "instance": NotifyHomeAssistant, + + "privacy_url": "hassios://user:****@localhost:80", + }), + ("hassio://user:pass@localhost:8123/long.lived.token", { + "instance": NotifyHomeAssistant, + # force a failure + "response": False, + "requests_response_code": requests.codes.internal_server_error, + }), + ("hassio://user:pass@localhost/long.lived.token", { + "instance": NotifyHomeAssistant, + # throw a bizzare code forcing us to fail to look it up + "response": False, + "requests_response_code": 999, + }), + ("hassio://user:pass@localhost/long.lived.token", { + "instance": NotifyHomeAssistant, + # Throws a series of connection and transfer exceptions when this flag + # is set and tests that we gracfully handle them + "test_requests_exceptions": True, + }), ) def test_plugin_homeassistant_urls(): - """NotifyHomeAssistant() Apprise URLs.""" + """ + NotifyHomeAssistant() Apprise URLs + + """ # Run our general tests AppriseURLTester(tests=apprise_url_tests).run_all() @@ -174,7 +152,10 @@ def test_plugin_homeassistant_urls(): @mock.patch("requests.post") def test_plugin_homeassistant_general(mock_post): - """NotifyHomeAssistant() General Checks.""" + """ + NotifyHomeAssistant() General Checks + + """ response = mock.Mock() response.content = "" @@ -183,8 +164,8 @@ def test_plugin_homeassistant_general(mock_post): # Prepare Mock mock_post.return_value = response - # Variation Initializations - obj = Apprise.instantiate("hassio://localhost/accesstoken") + # Initializations + obj = Apprise.instantiate("hassio://localhost/long.lived.token") assert isinstance(obj, NotifyHomeAssistant) is True assert isinstance(obj.url(), str) is True @@ -192,7 +173,174 @@ def test_plugin_homeassistant_general(mock_post): assert obj.send(body="test") is True assert mock_post.call_count == 1 - assert ( - mock_post.call_args_list[0][0][0] - == "http://localhost:8123/api/services/persistent_notification/create" - ) + assert mock_post.call_args_list[0][0][0] == \ + "http://localhost:8123/api/services/persistent_notification/create" + + # Reset our mock object + mock_post.reset_mock() + + # Now let's notify an object + obj = Apprise.instantiate( + "hassio://localhost/long.lived.token/service") + assert isinstance(obj, NotifyHomeAssistant) is True + assert isinstance(obj.url(), str) is True + + # Send Notification + assert obj.send(body="test") is True + + assert mock_post.call_args_list[0][0][0] == \ + "http://localhost:8123/api/services/notify/service" + posted_json = json.loads(mock_post.call_args_list[0][1]["data"]) + assert "notification_id" in posted_json + assert "targets" not in posted_json + assert "message" in posted_json + assert posted_json["message"] == "test" + assert "title" in posted_json + assert posted_json["title"] == "" + + # Reset our mock object + mock_post.reset_mock() + + # + # No Batch Processing + # + + # Now let's notify an object + obj = Apprise.instantiate( + "hassio://localhost/long.lived.token/serviceA:target1,target2/" + "service2/domain1.service3?batch=no") + assert isinstance(obj, NotifyHomeAssistant) is True + assert isinstance(obj.url(), str) is True + + # Send Notification + assert obj.send(body="test-body", title="title") is True + + # Entries are split apart + assert len(obj) == 4 + assert mock_post.call_count == 4 + + assert mock_post.call_args_list[0][0][0] == \ + "http://localhost:8123/api/services/domain1/service3" + posted_json = json.loads(mock_post.call_args_list[0][1]["data"]) + assert "notification_id" in posted_json + assert "targets" not in posted_json + assert "message" in posted_json + assert posted_json["message"] == "test-body" + assert "title" in posted_json + assert posted_json["title"] == "title" + + assert mock_post.call_args_list[1][0][0] == \ + "http://localhost:8123/api/services/notify/service2" + posted_json = json.loads(mock_post.call_args_list[1][1]["data"]) + assert "notification_id" in posted_json + assert "targets" not in posted_json + assert "message" in posted_json + assert posted_json["message"] == "test-body" + assert "title" in posted_json + assert posted_json["title"] == "title" + + assert mock_post.call_args_list[2][0][0] == \ + "http://localhost:8123/api/services/notify/serviceA" + posted_json = json.loads(mock_post.call_args_list[2][1]["data"]) + assert "notification_id" in posted_json + assert "targets" in posted_json + assert isinstance(posted_json["targets"], list) + assert len(posted_json["targets"]) == 1 + assert "target1" in posted_json["targets"] + assert "message" in posted_json + assert posted_json["message"] == "test-body" + assert "title" in posted_json + assert posted_json["title"] == "title" + + assert mock_post.call_args_list[3][0][0] == \ + "http://localhost:8123/api/services/notify/serviceA" + posted_json = json.loads(mock_post.call_args_list[3][1]["data"]) + assert "notification_id" in posted_json + assert "targets" in posted_json + assert isinstance(posted_json["targets"], list) + assert len(posted_json["targets"]) == 1 + assert "target2" in posted_json["targets"] + assert "message" in posted_json + assert posted_json["message"] == "test-body" + assert "title" in posted_json + assert posted_json["title"] == "title" + + # Reset our mock object + mock_post.reset_mock() + + # + # Batch Processing + # + + # Now let's notify an object + obj = Apprise.instantiate( + "hassio://localhost/long.lived.token/serviceA:target1,target2/" + "service2/domain1.service3?batch=yes") + assert isinstance(obj, NotifyHomeAssistant) is True + assert isinstance(obj.url(), str) is True + + # Send Notification + assert obj.send(body="test-body", title="title") is True + + # Entries targets can be grouped + assert len(obj) == 3 + assert mock_post.call_count == 3 + + assert mock_post.call_args_list[0][0][0] == \ + "http://localhost:8123/api/services/domain1/service3" + posted_json = json.loads(mock_post.call_args_list[0][1]["data"]) + assert "notification_id" in posted_json + assert "targets" not in posted_json + assert "message" in posted_json + assert posted_json["message"] == "test-body" + assert "title" in posted_json + assert posted_json["title"] == "title" + + assert mock_post.call_args_list[1][0][0] == \ + "http://localhost:8123/api/services/notify/service2" + posted_json = json.loads(mock_post.call_args_list[1][1]["data"]) + assert "notification_id" in posted_json + assert "targets" not in posted_json + assert "message" in posted_json + assert posted_json["message"] == "test-body" + assert "title" in posted_json + assert posted_json["title"] == "title" + + assert mock_post.call_args_list[2][0][0] == \ + "http://localhost:8123/api/services/notify/serviceA" + posted_json = json.loads(mock_post.call_args_list[2][1]["data"]) + assert "notification_id" in posted_json + assert "targets" in posted_json + assert isinstance(posted_json["targets"], list) + # Our batch groups our targets + assert len(posted_json["targets"]) == 2 + assert "target1" in posted_json["targets"] + assert "target2" in posted_json["targets"] + assert "message" in posted_json + assert posted_json["message"] == "test-body" + assert "title" in posted_json + assert posted_json["title"] == "title" + + # Reset our mock object + mock_post.reset_mock() + + # + # Test error handling on multi-query request + # + + # Now let's notify an object + obj = Apprise.instantiate( + "hassio://localhost/long.lived.token/serviceA:target1,target2/" + "service2:target3,target4,target5,target6?batch=no") + + assert isinstance(obj, NotifyHomeAssistant) is True + assert isinstance(obj.url(), str) is True + + bad_response = mock.Mock() + bad_response.content = "" + bad_response.status_code = requests.codes.not_found + + mock_post.side_effect = (response, bad_response) + + # We will fail on our second message sent + assert obj.send(body="test-body", title="title") is False