mirror of https://github.com/caronc/apprise
Merge 4ecc90a94a
into 8dc7ba7bdb
commit
c73739889c
|
@ -73,7 +73,7 @@ The table below identifies the services this tool supports and some example serv
|
|||
| [Gotify](https://github.com/caronc/apprise/wiki/Notify_gotify) | gotify:// or gotifys:// | (TCP) 80 or 443 | gotify://hostname/token<br />gotifys://hostname/token?priority=high
|
||||
| [Growl](https://github.com/caronc/apprise/wiki/Notify_growl) | growl:// | (UDP) 23053 | growl://hostname<br />growl://hostname:portno<br />growl://password@hostname<br />growl://password@hostname:port</br>**Note**: you can also use the get parameter _version_ which can allow the growl request to behave using the older v1.x protocol. An example would look like: growl://hostname?version=1
|
||||
| [Guilded](https://github.com/caronc/apprise/wiki/Notify_guilded) | guilded:// | (TCP) 443 | guilded://webhook_id/webhook_token<br />guilded://avatar@webhook_id/webhook_token
|
||||
| [Home Assistant](https://github.com/caronc/apprise/wiki/Notify_homeassistant) | hassio:// or hassios:// | (TCP) 8123 or 443 | hassio://hostname/accesstoken<br />hassio://user@hostname/accesstoken<br />hassio://user:password@hostname:port/accesstoken<br />hassio://hostname/optional/path/accesstoken
|
||||
| [Home Assistant](https://github.com/caronc/apprise/wiki/Notify_homeassistant) | hassio:// or hassios:// | (TCP) 8123 or 443 | hassio://hostname/long-lived-token<br />hassio://user@hostname/long-lived-token<br />hassio://user:password@hostname:port/long-lived-token<br />hassio://hostname/optional/path/long-lived-token<br />hassio://user@hostname/long-lived-token/service<br />hassio://user@hostname/long-lived-token/Service1/Service2/ServiceN
|
||||
| [IFTTT](https://github.com/caronc/apprise/wiki/Notify_ifttt) | ifttt:// | (TCP) 443 | ifttt://webhooksID/Event<br />ifttt://webhooksID/Event1/Event2/EventN<br/>ifttt://webhooksID/Event1/?+Key=Value<br/>ifttt://webhooksID/Event1/?-Key=value1
|
||||
| [Join](https://github.com/caronc/apprise/wiki/Notify_join) | join:// | (TCP) 443 | join://apikey/device<br />join://apikey/device1/device2/deviceN/<br />join://apikey/group<br />join://apikey/groupA/groupB/groupN<br />join://apikey/DeviceA/groupA/groupN/DeviceN/
|
||||
| [KODI](https://github.com/caronc/apprise/wiki/Notify_kodi) | kodi:// or kodis:// | (TCP) 8080 or 443 | kodi://hostname<br />kodi://user@hostname<br />kodi://user:password@hostname:port
|
||||
|
|
|
@ -27,8 +27,10 @@
|
|||
|
||||
# You must generate a "Long-Lived Access Token". This can be done from your
|
||||
# Home Assistant Profile page.
|
||||
|
||||
from itertools import chain
|
||||
from json import dumps
|
||||
import math
|
||||
import re
|
||||
from uuid import uuid4
|
||||
|
||||
import requests
|
||||
|
@ -36,12 +38,40 @@ import requests
|
|||
from ..common import NotifyType
|
||||
from ..locale import gettext_lazy as _
|
||||
from ..url import PrivacyMode
|
||||
from ..utils.parse import validate_regex
|
||||
from ..utils.parse import (
|
||||
is_domain_service_target,
|
||||
parse_bool,
|
||||
parse_domain_service_targets,
|
||||
validate_regex,
|
||||
)
|
||||
from .base import NotifyBase
|
||||
|
||||
# This regex matches exactly 8 hex digits,
|
||||
# a dot, then exactly 64 hex digits. it can also be a JWT
|
||||
# token in which case it will be 180 characters+
|
||||
RE_IS_LONG_LIVED_TOKEN = re.compile(
|
||||
r"^([0-9a-f]{8}\.[0-9a-f]{64}|[a-z0-9_-]+\.[a-z0-9_-]+\.[a-z0-9_-]+)$",
|
||||
re.I)
|
||||
|
||||
# Define our supported device notification formats:
|
||||
# - service
|
||||
# - default domain is always 'notify' if one isn't detected
|
||||
# - service:target
|
||||
# - service:target1,target2,target3
|
||||
# - domain.service
|
||||
# - domain.service:target
|
||||
# - domain.service:target1,target2,target3
|
||||
# - - targets can be comma/space separated if more hten one
|
||||
# - service:target1,target2,target3
|
||||
|
||||
# Define a persistent entry (used for handling message delivery
|
||||
PERSISTENT_ENTRY = (None, None, [])
|
||||
|
||||
|
||||
class NotifyHomeAssistant(NotifyBase):
|
||||
"""A wrapper for Home Assistant Notifications."""
|
||||
"""
|
||||
A wrapper for Home Assistant Notifications
|
||||
"""
|
||||
|
||||
# The default descriptive name associated with the Notification
|
||||
service_name = "HomeAssistant"
|
||||
|
@ -58,116 +88,179 @@ class NotifyHomeAssistant(NotifyBase):
|
|||
# Default to Home Assistant Default Insecure port of 8123 instead of 80
|
||||
default_insecure_port = 8123
|
||||
|
||||
# The maximum amount of services that can be notified in a single batch
|
||||
default_batch_size = 10
|
||||
|
||||
# The default ha notification domain if one isn't detected
|
||||
default_domain = "notify"
|
||||
|
||||
# A URL that takes you to the setup/help of the specific protocol
|
||||
setup_url = "https://github.com/caronc/apprise/wiki/Notify_homeassistant"
|
||||
|
||||
# Define object templates
|
||||
templates = (
|
||||
"{schema}://{host}/{accesstoken}",
|
||||
"{schema}://{host}:{port}/{accesstoken}",
|
||||
"{schema}://{user}@{host}/{accesstoken}",
|
||||
"{schema}://{user}@{host}:{port}/{accesstoken}",
|
||||
"{schema}://{user}:{password}@{host}/{accesstoken}",
|
||||
"{schema}://{user}:{password}@{host}:{port}/{accesstoken}",
|
||||
"{schema}://{host}/{token}",
|
||||
"{schema}://{host}:{port}/{token}",
|
||||
"{schema}://{user}@{host}/{token}",
|
||||
"{schema}://{user}@{host}:{port}/{token}",
|
||||
"{schema}://{user}:{password}@{host}/{token}",
|
||||
"{schema}://{user}:{password}@{host}:{port}/{token}",
|
||||
"{schema}://{host}/{token}/{targets}",
|
||||
"{schema}://{host}:{port}/{token}/{targets}",
|
||||
"{schema}://{user}@{host}/{token}/{targets}",
|
||||
"{schema}://{user}@{host}:{port}/{token}/{targets}",
|
||||
"{schema}://{user}:{password}@{host}/{token}/{targets}",
|
||||
"{schema}://{user}:{password}@{host}:{port}/{token}/{targets}",
|
||||
)
|
||||
|
||||
# Define our template tokens
|
||||
template_tokens = dict(
|
||||
NotifyBase.template_tokens,
|
||||
**{
|
||||
"host": {
|
||||
"name": _("Hostname"),
|
||||
"type": "string",
|
||||
"required": True,
|
||||
},
|
||||
"port": {
|
||||
"name": _("Port"),
|
||||
"type": "int",
|
||||
"min": 1,
|
||||
"max": 65535,
|
||||
},
|
||||
"user": {
|
||||
"name": _("Username"),
|
||||
"type": "string",
|
||||
},
|
||||
"password": {
|
||||
"name": _("Password"),
|
||||
"type": "string",
|
||||
"private": True,
|
||||
},
|
||||
"accesstoken": {
|
||||
"name": _("Long-Lived Access Token"),
|
||||
"type": "string",
|
||||
"private": True,
|
||||
"required": True,
|
||||
},
|
||||
template_tokens = dict(NotifyBase.template_tokens, **{
|
||||
"host": {
|
||||
"name": _("Hostname"),
|
||||
"type": "string",
|
||||
"required": True,
|
||||
},
|
||||
)
|
||||
"port": {
|
||||
"name": _("Port"),
|
||||
"type": "int",
|
||||
"min": 1,
|
||||
"max": 65535,
|
||||
},
|
||||
"user": {
|
||||
"name": _("Username"),
|
||||
"type": "string",
|
||||
},
|
||||
"password": {
|
||||
"name": _("Password"),
|
||||
"type": "string",
|
||||
"private": True,
|
||||
},
|
||||
"token": {
|
||||
"name": _("Long-Lived Access Token"),
|
||||
"type": "string",
|
||||
"private": True,
|
||||
"required": True,
|
||||
},
|
||||
"target_device": {
|
||||
"name": _("Target Device"),
|
||||
"type": "string",
|
||||
"map_to": "targets",
|
||||
},
|
||||
"targets": {
|
||||
"name": _("Targets"),
|
||||
"type": "list:string",
|
||||
},
|
||||
})
|
||||
|
||||
# Define our template arguments
|
||||
template_args = dict(
|
||||
NotifyBase.template_args,
|
||||
**{
|
||||
"nid": {
|
||||
# Optional Unique Notification ID
|
||||
"name": _("Notification ID"),
|
||||
"type": "string",
|
||||
"regex": (r"^[a-z0-9_-]+$", "i"),
|
||||
},
|
||||
template_args = dict(NotifyBase.template_args, **{
|
||||
"nid": {
|
||||
# Optional Unique Notification ID
|
||||
"name": _("Notification ID"),
|
||||
"type": "string",
|
||||
"regex": (r"^[a-z0-9_-]+$", "i"),
|
||||
},
|
||||
)
|
||||
"prefix": {
|
||||
# Path Prefix to use (for those not hosting their hasio instance
|
||||
# in /)
|
||||
"name": _("Path Prefix"),
|
||||
"type": "string",
|
||||
},
|
||||
"batch": {
|
||||
"name": _("Batch Mode"),
|
||||
"type": "bool",
|
||||
"default": False,
|
||||
},
|
||||
"to": {
|
||||
"alias_of": "targets",
|
||||
},
|
||||
})
|
||||
|
||||
def __init__(self, accesstoken, nid=None, **kwargs):
|
||||
"""Initialize Home Assistant Object."""
|
||||
def __init__(self, token, nid=None, targets=None, prefix=None,
|
||||
batch=None, **kwargs):
|
||||
"""
|
||||
Initialize Home Assistant Object
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.fullpath = kwargs.get("fullpath", "")
|
||||
self.prefix = prefix or kwargs.get("fullpath", "")
|
||||
|
||||
if not (self.secure or self.port):
|
||||
# Use default insecure port
|
||||
self.port = self.default_insecure_port
|
||||
|
||||
# Long-Lived Access token (generated from User Profile)
|
||||
self.accesstoken = validate_regex(accesstoken)
|
||||
if not self.accesstoken:
|
||||
msg = (
|
||||
"An invalid Home Assistant Long-Lived Access Token "
|
||||
f"({accesstoken}) was specified."
|
||||
)
|
||||
self.token = validate_regex(token)
|
||||
if not self.token:
|
||||
msg = "An invalid Home Assistant Long-Lived Access Token " \
|
||||
"({}) was specified.".format(token)
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
# An Optional Notification Identifier
|
||||
self.nid = None
|
||||
if nid:
|
||||
self.nid = validate_regex(nid, *self.template_args["nid"]["regex"])
|
||||
self.nid = validate_regex(
|
||||
nid, *self.template_args["nid"]["regex"])
|
||||
if not self.nid:
|
||||
msg = (
|
||||
"An invalid Home Assistant Notification Identifier "
|
||||
f"({nid}) was specified."
|
||||
)
|
||||
msg = "An invalid Home Assistant Notification Identifier " \
|
||||
"({}) was specified.".format(nid)
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
# Prepare Batch Mode Flag
|
||||
self.batch = self.template_args["batch"]["default"] \
|
||||
if batch is None else batch
|
||||
|
||||
# Store our targets
|
||||
self.targets = []
|
||||
|
||||
# Track our invalid targets
|
||||
self._invalid_targets = []
|
||||
|
||||
if targets:
|
||||
for target in parse_domain_service_targets(targets):
|
||||
result = is_domain_service_target(
|
||||
target, domain=self.default_domain)
|
||||
if result:
|
||||
self.targets.append((
|
||||
result["domain"],
|
||||
result["service"],
|
||||
result["targets"],
|
||||
))
|
||||
continue
|
||||
|
||||
self.logger.warning(
|
||||
"Dropped invalid [domain.]service[:target] entry "
|
||||
"({}) specified.".format(target),
|
||||
)
|
||||
self._invalid_targets.append(target)
|
||||
else:
|
||||
self.targets = [PERSISTENT_ENTRY]
|
||||
|
||||
return
|
||||
|
||||
def send(self, body, title="", notify_type=NotifyType.INFO, **kwargs):
|
||||
"""Sends Message."""
|
||||
"""
|
||||
Sends Message
|
||||
"""
|
||||
|
||||
if not self.targets:
|
||||
self.logger.warning(
|
||||
"There are no valid Home Assistant targets to notify.")
|
||||
return False
|
||||
|
||||
# Prepare our persistent_notification.create payload
|
||||
payload = {
|
||||
"title": title,
|
||||
"message": body,
|
||||
# Use a unique ID so we don't over-write the last message
|
||||
# we posted. Otherwise use the notification id specified
|
||||
"notification_id": self.nid if self.nid else str(uuid4()),
|
||||
}
|
||||
|
||||
# Prepare our headers
|
||||
headers = {
|
||||
"User-Agent": self.app_id,
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {self.accesstoken}",
|
||||
"Authorization": "Bearer {}".format(self.token),
|
||||
}
|
||||
|
||||
auth = None
|
||||
|
@ -177,24 +270,66 @@ class NotifyHomeAssistant(NotifyBase):
|
|||
# Set our schema
|
||||
schema = "https" if self.secure else "http"
|
||||
|
||||
url = f"{schema}://{self.host}"
|
||||
url = "{}://{}".format(schema, self.host)
|
||||
if isinstance(self.port, int):
|
||||
url += f":{self.port}"
|
||||
|
||||
url += (
|
||||
self.fullpath.rstrip("/")
|
||||
+ "/api/services/persistent_notification/create"
|
||||
)
|
||||
# Determine if we're doing it the old way (using persistent notices)
|
||||
# or the new (supporting device targets)
|
||||
has_targets = \
|
||||
bool(not self.targets or self.targets[0] is not PERSISTENT_ENTRY)
|
||||
|
||||
# our base url
|
||||
base_url = url + self.prefix.rstrip("/") + \
|
||||
"/api/services/persistent_notification/create"
|
||||
|
||||
# Send in batches if identified to do so
|
||||
batch_size = 1 if not self.batch else self.default_batch_size
|
||||
|
||||
for target in self.targets:
|
||||
# Use a unique ID so we don't over-write the last message we
|
||||
# posted. Otherwise use the notification id specified
|
||||
if has_targets:
|
||||
# Base target details
|
||||
domain = target[0]
|
||||
service = target[1]
|
||||
|
||||
# Prepare our URL
|
||||
base_url = url + self.prefix.rstrip("/") + \
|
||||
f"/api/services/{domain}/{service}"
|
||||
|
||||
# Possibly prepare batches
|
||||
if target[2]:
|
||||
_payload = payload.copy()
|
||||
for index in range(0, len(target[2]), batch_size):
|
||||
_payload["targets"] = \
|
||||
target[2][index:index + batch_size]
|
||||
if not self._ha_post(
|
||||
base_url, _payload, headers, auth):
|
||||
return False
|
||||
|
||||
# We're done
|
||||
return True
|
||||
|
||||
if not self._ha_post(base_url, payload, headers, auth):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _ha_post(self, url, payload, headers, auth=None):
|
||||
"""
|
||||
Wrapper to single upstream server post
|
||||
"""
|
||||
# Notification ID
|
||||
payload["notification_id"] = self.nid if self.nid else str(uuid4())
|
||||
|
||||
self.logger.debug(
|
||||
"Home Assistant POST URL:"
|
||||
f" {url} (cert_verify={self.verify_certificate!r})"
|
||||
)
|
||||
self.logger.debug(f"Home Assistant Payload: {payload!s}")
|
||||
"Home Assistant POST URL: {} (cert_verify={!r})".format(
|
||||
url, self.verify_certificate))
|
||||
self.logger.debug("Home Assistant Payload: {}".format(str(payload)))
|
||||
|
||||
# Always call throttle before any remote server i/o is made
|
||||
self.throttle()
|
||||
|
||||
try:
|
||||
r = requests.post(
|
||||
url,
|
||||
|
@ -206,18 +341,18 @@ class NotifyHomeAssistant(NotifyBase):
|
|||
)
|
||||
if r.status_code != requests.codes.ok:
|
||||
# We had a problem
|
||||
status_str = NotifyHomeAssistant.http_response_code_lookup(
|
||||
r.status_code
|
||||
)
|
||||
status_str = \
|
||||
NotifyHomeAssistant.http_response_code_lookup(
|
||||
r.status_code)
|
||||
|
||||
self.logger.warning(
|
||||
"Failed to send Home Assistant notification: "
|
||||
"{}{}error={}.".format(
|
||||
status_str, ", " if status_str else "", r.status_code
|
||||
)
|
||||
)
|
||||
status_str,
|
||||
", " if status_str else "",
|
||||
r.status_code))
|
||||
|
||||
self.logger.debug(f"Response Details:\r\n{r.content}")
|
||||
self.logger.debug("Response Details:\r\n{}".format(r.content))
|
||||
|
||||
# Return; we're done
|
||||
return False
|
||||
|
@ -228,9 +363,8 @@ class NotifyHomeAssistant(NotifyBase):
|
|||
except requests.RequestException as e:
|
||||
self.logger.warning(
|
||||
"A Connection error occurred sending Home Assistant "
|
||||
f"notification to {self.host}."
|
||||
)
|
||||
self.logger.debug(f"Socket Exception: {e!s}")
|
||||
"notification to {}.".format(self.host))
|
||||
self.logger.debug("Socket Exception: {}".format(str(e)))
|
||||
|
||||
# Return; we're done
|
||||
return False
|
||||
|
@ -239,30 +373,34 @@ class NotifyHomeAssistant(NotifyBase):
|
|||
|
||||
@property
|
||||
def url_identifier(self):
|
||||
"""Returns all of the identifiers that make this URL unique from
|
||||
another simliar one.
|
||||
|
||||
Targets or end points should never be identified here.
|
||||
"""
|
||||
Returns all of the identifiers that make this URL unique from
|
||||
another simliar one. Targets or end points should never be identified
|
||||
here.
|
||||
"""
|
||||
return (
|
||||
self.secure_protocol if self.secure else self.protocol,
|
||||
self.user,
|
||||
self.password,
|
||||
self.host,
|
||||
(
|
||||
self.port
|
||||
if self.port
|
||||
else (443 if self.secure else self.default_insecure_port)
|
||||
),
|
||||
self.fullpath.rstrip("/"),
|
||||
self.accesstoken,
|
||||
self.user, self.password, self.host,
|
||||
self.port if self.port else (
|
||||
443 if self.secure else self.default_insecure_port),
|
||||
self.prefix.rstrip("/"),
|
||||
self.token,
|
||||
)
|
||||
|
||||
def url(self, privacy=False, *args, **kwargs):
|
||||
"""Returns the URL built dynamically based on specified arguments."""
|
||||
"""
|
||||
Returns the URL built dynamically based on specified arguments.
|
||||
"""
|
||||
|
||||
# Define any URL parameters
|
||||
params = {}
|
||||
params = {
|
||||
"batch": "yes" if self.batch else "no",
|
||||
}
|
||||
|
||||
if self.prefix not in ("", "/"):
|
||||
params["prefix"] = "/" if not self.prefix \
|
||||
else "/{}/".format(self.prefix.strip("/"))
|
||||
|
||||
if self.nid:
|
||||
params["nid"] = self.nid
|
||||
|
||||
|
@ -275,8 +413,7 @@ class NotifyHomeAssistant(NotifyBase):
|
|||
auth = "{user}:{password}@".format(
|
||||
user=NotifyHomeAssistant.quote(self.user, safe=""),
|
||||
password=self.pprint(
|
||||
self.password, privacy, mode=PrivacyMode.Secret, safe=""
|
||||
),
|
||||
self.password, privacy, mode=PrivacyMode.Secret, safe=""),
|
||||
)
|
||||
elif self.user:
|
||||
auth = "{user}@".format(
|
||||
|
@ -285,65 +422,123 @@ class NotifyHomeAssistant(NotifyBase):
|
|||
|
||||
default_port = 443 if self.secure else self.default_insecure_port
|
||||
|
||||
url = (
|
||||
"{schema}://{auth}{hostname}{port}{fullpath}"
|
||||
"{accesstoken}/?{params}"
|
||||
)
|
||||
url = "{schema}://{auth}{hostname}{port}/" \
|
||||
"{token}/{targets}?{params}"
|
||||
|
||||
# Determine if we're doing it the old way (using persistent notices)
|
||||
# or the new (supporting device targets)
|
||||
has_targets = \
|
||||
bool(not self.targets or self.targets[0] is not PERSISTENT_ENTRY)
|
||||
|
||||
return url.format(
|
||||
schema=self.secure_protocol if self.secure else self.protocol,
|
||||
auth=auth,
|
||||
# never encode hostname since we're expecting it to be a valid one
|
||||
hostname=self.host,
|
||||
port=(
|
||||
""
|
||||
if not self.port or self.port == default_port
|
||||
else f":{self.port}"
|
||||
),
|
||||
fullpath=(
|
||||
"/"
|
||||
if not self.fullpath
|
||||
else "/{}/".format(
|
||||
NotifyHomeAssistant.quote(
|
||||
self.fullpath.strip("/"), safe="/"
|
||||
)
|
||||
)
|
||||
),
|
||||
accesstoken=self.pprint(self.accesstoken, privacy, safe=""),
|
||||
port="" if not self.port or self.port == default_port
|
||||
else ":{}".format(self.port),
|
||||
token=self.pprint(self.token, privacy, safe=""),
|
||||
targets="" if not has_targets else "/".join(
|
||||
chain([NotifyHomeAssistant.quote("{}.{}{}".format(
|
||||
x[0], x[1], ""
|
||||
if not x[2] else ":" + ",".join(x[2])), safe="")
|
||||
for x in self.targets],
|
||||
[NotifyHomeAssistant.quote(x, safe="")
|
||||
for x in self._invalid_targets])),
|
||||
params=NotifyHomeAssistant.urlencode(params),
|
||||
)
|
||||
|
||||
def __len__(self):
|
||||
"""
|
||||
Returns the number of targets associated with this notification
|
||||
"""
|
||||
#
|
||||
# Factor batch into calculation
|
||||
#
|
||||
|
||||
# Determine if we're doing it the old way (using persistent notices)
|
||||
# or the new (supporting device targets)
|
||||
has_targets = \
|
||||
bool(not self.targets or self.targets[0] is not PERSISTENT_ENTRY)
|
||||
|
||||
if not has_targets:
|
||||
return 1
|
||||
|
||||
# Handle targets
|
||||
batch_size = 1 if not self.batch else self.default_batch_size
|
||||
return sum(
|
||||
math.ceil(len(identities) / batch_size)
|
||||
if identities else 1 for _, _, identities in self.targets)
|
||||
|
||||
@staticmethod
|
||||
def parse_url(url):
|
||||
"""Parses the URL and returns enough arguments that can allow us to re-
|
||||
instantiate this object."""
|
||||
"""
|
||||
Parses the URL and returns enough arguments that can allow
|
||||
us to re-instantiate this object.
|
||||
|
||||
"""
|
||||
|
||||
results = NotifyBase.parse_url(url, verify_host=False)
|
||||
if not results:
|
||||
# We're done early as we couldn't load the results
|
||||
return results
|
||||
|
||||
# Get our Long-Lived Access Token
|
||||
if "accesstoken" in results["qsd"] and len(
|
||||
results["qsd"]["accesstoken"]
|
||||
):
|
||||
results["accesstoken"] = NotifyHomeAssistant.unquote(
|
||||
results["qsd"]["accesstoken"]
|
||||
)
|
||||
# Set our path to use:
|
||||
if "prefix" in results["qsd"] and len(results["qsd"]["prefix"]):
|
||||
results["prefix"] = \
|
||||
NotifyHomeAssistant.unquote(results["qsd"]["prefix"])
|
||||
|
||||
else:
|
||||
# Acquire our full path
|
||||
fullpath = NotifyHomeAssistant.split_path(results["fullpath"])
|
||||
# Long Lived Access token placeholder
|
||||
results["token"] = None
|
||||
|
||||
# Otherwise pop the last element from our path to be it
|
||||
results["accesstoken"] = fullpath.pop() if fullpath else None
|
||||
# Get our Long-Lived Access Token (if defined)
|
||||
if "token" in results["qsd"] and \
|
||||
len(results["qsd"]["token"]):
|
||||
results["token"] = \
|
||||
NotifyHomeAssistant.unquote(results["qsd"]["token"])
|
||||
|
||||
# Re-assemble our full path
|
||||
results["fullpath"] = "/" + "/".join(fullpath) if fullpath else ""
|
||||
# Acquire our full path
|
||||
tokens = NotifyHomeAssistant.split_path(results["fullpath"])
|
||||
results["targets"] = []
|
||||
|
||||
while tokens:
|
||||
# Iterate through our tokens
|
||||
token = tokens.pop()
|
||||
if not results["token"]:
|
||||
if RE_IS_LONG_LIVED_TOKEN.match(token):
|
||||
# Store our access token
|
||||
results["token"] = token
|
||||
|
||||
# Re-assemble our full path
|
||||
results["fullpath"] = "/" + "/".join(tokens)
|
||||
continue
|
||||
|
||||
# If we don't have an access token, then we can assume
|
||||
# it's a device we're storing
|
||||
results["targets"].append(token)
|
||||
continue
|
||||
|
||||
elif "prefix" not in results:
|
||||
# Re-assemble our full path
|
||||
results["fullpath"] = "/" + "/".join([*tokens, token])
|
||||
|
||||
# We're done
|
||||
break
|
||||
|
||||
# prefix is in the result set, so therefore we're dealing with a
|
||||
# custom target/service
|
||||
results["targets"].append(token)
|
||||
|
||||
# Get Batch Mode Flag
|
||||
results["batch"] = \
|
||||
parse_bool(results["qsd"].get(
|
||||
"batch",
|
||||
NotifyHomeAssistant.template_args["batch"]["default"]))
|
||||
|
||||
# Allow the specification of a unique notification_id so that
|
||||
# it will always replace the last one sent.
|
||||
if "nid" in results["qsd"] and len(results["qsd"]["nid"]):
|
||||
results["nid"] = NotifyHomeAssistant.unquote(results["qsd"]["nid"])
|
||||
results["nid"] = \
|
||||
NotifyHomeAssistant.unquote(results["qsd"]["nid"])
|
||||
|
||||
return results
|
||||
|
|
|
@ -100,6 +100,15 @@ PHONE_NO_DETECTION_RE = re.compile(
|
|||
r"\s*([+(\s]*[0-9][0-9()\s-]+[0-9])(?=$|[\s,+(]+[0-9])", re.I
|
||||
)
|
||||
|
||||
IS_DOMAIN_SERVICE_TARGET = re.compile(
|
||||
r"\s*((?P<domain>[a-z0-9_-]+)\.)?(?P<service>[a-z0-9_-]+)"
|
||||
r"(:(?P<targets>[a-z0-9_,-]+))?", re.I)
|
||||
|
||||
DOMAIN_SERVICE_TARGET_DETECTION_RE = re.compile(
|
||||
r"\s*((?:[a-z0-9_-]+\.)?[a-z0-9_-]+"
|
||||
r"(?::(?:[a-z0-9_-]+(?:,+[a-z0-9_-]+)+?))?)"
|
||||
r"(?=$|(?:\s|,+\s|\s,+)+(?:[a-z0-9_-]+\.)?[a-z0-9_-]+)", re.I)
|
||||
|
||||
# Support for prefix: (string followed by colon) infront of phone no
|
||||
PHONE_NO_WPREFIX_DETECTION_RE = re.compile(
|
||||
r"\s*((?:[a-z]+:)?[+(\s]*[0-9][0-9()\s-]+[0-9])"
|
||||
|
@ -262,6 +271,44 @@ def is_uuid(uuid):
|
|||
return bool(match)
|
||||
|
||||
|
||||
def is_domain_service_target(entry, domain="notify"):
|
||||
"""Determine if the specified entry a domain.service:target type
|
||||
|
||||
Expects a string containing the following formats:
|
||||
- service
|
||||
- service:target
|
||||
- service:target1,target2
|
||||
- domain.service:target
|
||||
- domain.service:target1,target2
|
||||
|
||||
Args:
|
||||
entry (str): The string you want to check.
|
||||
|
||||
Returns:
|
||||
bool: Returns False if the entry specified is domain.service:target
|
||||
"""
|
||||
|
||||
try:
|
||||
result = IS_DOMAIN_SERVICE_TARGET.match(entry)
|
||||
if not result:
|
||||
# not parseable content as it does not even conform closely to a
|
||||
# domain.service:target
|
||||
return False
|
||||
|
||||
except TypeError:
|
||||
# not parseable content
|
||||
return False
|
||||
|
||||
return {
|
||||
# Store domain or set default if not acquired
|
||||
"domain": result.group("domain") if result.group("domain") else domain,
|
||||
# store service
|
||||
"service": result.group("service"),
|
||||
# store targets if defined
|
||||
"targets": parse_list(result.group("targets"))
|
||||
}
|
||||
|
||||
|
||||
def is_phone_no(phone, min_len=10):
|
||||
"""Determine if the specified entry is a phone number.
|
||||
|
||||
|
@ -358,7 +405,7 @@ def is_call_sign(callsign):
|
|||
callsign (str): The string you want to check.
|
||||
|
||||
Returns:
|
||||
bool: Returns False if the address specified is not a phone number
|
||||
bool: Returns False if the enry specified is not a callsign
|
||||
"""
|
||||
|
||||
try:
|
||||
|
@ -850,6 +897,47 @@ def parse_bool(arg, default=False):
|
|||
return bool(arg)
|
||||
|
||||
|
||||
def parse_domain_service_targets(
|
||||
*args, store_unparseable=True, domain="notify", **kwargs):
|
||||
"""
|
||||
Takes a string containing the following formats separated by space
|
||||
- service
|
||||
- service:target
|
||||
- service:target1,target2
|
||||
- domain.service:target
|
||||
- domain.service:target1,target2
|
||||
|
||||
If no domain is parsed, the default domain is returned.
|
||||
|
||||
Targets can be comma separated (if multiple are to be defined)
|
||||
"""
|
||||
|
||||
result = []
|
||||
for arg in args:
|
||||
if isinstance(arg, str) and arg:
|
||||
_result = DOMAIN_SERVICE_TARGET_DETECTION_RE.findall(arg)
|
||||
if _result:
|
||||
result += _result
|
||||
|
||||
elif not _result and store_unparseable:
|
||||
# we had content passed into us that was lost because it was
|
||||
# so poorly formatted that it didn't even come close to
|
||||
# meeting the regular expression we defined. We intentially
|
||||
# keep it as part of our result set so that parsing done
|
||||
# at a higher level can at least report this to the end user
|
||||
# and hopefully give them some indication as to what they
|
||||
# may have done wrong.
|
||||
result += \
|
||||
list(filter(bool, re.split(STRING_DELIMITERS, arg)))
|
||||
|
||||
elif isinstance(arg, (set, list, tuple)):
|
||||
# Use recursion to handle the list of phone numbers
|
||||
result += parse_domain_service_targets(
|
||||
*arg, store_unparseable=store_unparseable, domain=domain)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def parse_phone_no(*args, store_unparseable=True, prefix=False, **kwargs):
|
||||
"""Takes a string containing phone numbers separated by comma's and/or
|
||||
spaces and returns a list."""
|
||||
|
|
|
@ -1665,6 +1665,65 @@ def test_is_email():
|
|||
assert results["user"] == "a-z0-9_!#$%&*/=?%`{|}~^.-"
|
||||
|
||||
|
||||
def test_is_domain_service_target():
|
||||
"""
|
||||
API: is_domain_service_target() function
|
||||
|
||||
"""
|
||||
# Invalid information
|
||||
assert utils.parse.is_domain_service_target(None) is False
|
||||
assert utils.parse.is_domain_service_target(42) is False
|
||||
assert utils.parse.is_domain_service_target(object) is False
|
||||
assert utils.parse.is_domain_service_target("") is False
|
||||
assert utils.parse.is_domain_service_target("+()") is False
|
||||
assert utils.parse.is_domain_service_target("+") is False
|
||||
|
||||
# Valid entries
|
||||
result = utils.parse.is_domain_service_target("service")
|
||||
assert isinstance(result, dict)
|
||||
assert result["service"] == "service"
|
||||
# Default domain
|
||||
assert result["domain"] == "notify"
|
||||
assert isinstance(result["targets"], list)
|
||||
assert len(result["targets"]) == 0
|
||||
|
||||
result = utils.parse.is_domain_service_target("domain.service")
|
||||
assert isinstance(result, dict)
|
||||
assert result["service"] == "service"
|
||||
assert result["domain"] == "domain"
|
||||
assert isinstance(result["targets"], list)
|
||||
assert len(result["targets"]) == 0
|
||||
|
||||
result = utils.parse.is_domain_service_target("domain.service:target")
|
||||
assert isinstance(result, dict)
|
||||
assert result["service"] == "service"
|
||||
assert result["domain"] == "domain"
|
||||
assert isinstance(result["targets"], list)
|
||||
assert len(result["targets"]) == 1
|
||||
assert result["targets"][0] == "target"
|
||||
|
||||
result = utils.parse.is_domain_service_target("domain.service:t1,t2,t3")
|
||||
assert isinstance(result, dict)
|
||||
assert result["service"] == "service"
|
||||
assert result["domain"] == "domain"
|
||||
assert isinstance(result["targets"], list)
|
||||
assert len(result["targets"]) == 3
|
||||
assert "t1" in result["targets"]
|
||||
assert "t2" in result["targets"]
|
||||
assert "t3" in result["targets"]
|
||||
|
||||
result = utils.parse.is_domain_service_target(
|
||||
"service:t1,t2,t3", domain="new_default")
|
||||
assert isinstance(result, dict)
|
||||
assert result["service"] == "service"
|
||||
# Default domain
|
||||
assert result["domain"] == "new_default"
|
||||
assert isinstance(result["targets"], list)
|
||||
assert len(result["targets"]) == 3
|
||||
assert "t1" in result["targets"]
|
||||
assert "t2" in result["targets"]
|
||||
assert "t3" in result["targets"]
|
||||
|
||||
def test_is_call_sign_no():
|
||||
"""
|
||||
API: is_call_sign() function
|
||||
|
@ -1858,6 +1917,57 @@ def test_parse_call_sign():
|
|||
assert "DF1ABC" in results
|
||||
|
||||
|
||||
def test_parse_domain_service_targets():
|
||||
"""utils: parse_domain_service_targets() testing """
|
||||
|
||||
results = utils.parse.parse_domain_service_targets("")
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 0
|
||||
|
||||
results = utils.parse.parse_domain_service_targets("service1 service2")
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 2
|
||||
assert "service1" in results
|
||||
assert "service2" in results
|
||||
|
||||
results = utils.parse.parse_domain_service_targets(
|
||||
"service1:target1,target2")
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 1
|
||||
assert "service1:target1,target2" in results
|
||||
|
||||
results = utils.parse.parse_domain_service_targets(
|
||||
"service1:target1,target2 service2 domain.service3")
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 3
|
||||
assert "service1:target1,target2" in results
|
||||
assert "service2" in results
|
||||
assert "domain.service3" in results
|
||||
|
||||
# Support a comma in the space between entries
|
||||
results = utils.parse.parse_domain_service_targets(
|
||||
"service1:target1,target2, service2 ,domain.service3,"
|
||||
" , , service4")
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 4
|
||||
assert "service1:target1,target2" in results
|
||||
assert "service2" in results
|
||||
assert "domain.service3" in results
|
||||
assert "service4" in results
|
||||
|
||||
results = utils.parse.parse_domain_service_targets(
|
||||
"service:target1,target2")
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 1
|
||||
assert "service:target1,target2" in results
|
||||
|
||||
# Handle unparseables
|
||||
results = utils.parse.parse_domain_service_targets(
|
||||
": %invalid ^entries%", store_unparseable=False)
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 0
|
||||
|
||||
|
||||
def test_parse_phone_no():
|
||||
"""utils: parse_phone_no() testing"""
|
||||
# A simple single array entry (As str)
|
||||
|
|
|
@ -25,6 +25,8 @@
|
|||
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
# POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
import json
|
||||
|
||||
# Disable logging for a cleaner testing output
|
||||
import logging
|
||||
from unittest import mock
|
||||
|
@ -39,134 +41,110 @@ logging.disable(logging.CRITICAL)
|
|||
|
||||
# Our Testing URLs
|
||||
apprise_url_tests = (
|
||||
(
|
||||
"hassio://:@/",
|
||||
{
|
||||
"instance": TypeError,
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassio://",
|
||||
{
|
||||
"instance": TypeError,
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassios://",
|
||||
{
|
||||
"instance": TypeError,
|
||||
},
|
||||
),
|
||||
("hassio://:@/", {
|
||||
"instance": TypeError,
|
||||
}),
|
||||
("hassio://", {
|
||||
"instance": TypeError,
|
||||
}),
|
||||
("hassios://", {
|
||||
"instance": TypeError,
|
||||
}),
|
||||
# No Long Lived Access Token specified
|
||||
(
|
||||
"hassio://user@localhost",
|
||||
{
|
||||
"instance": TypeError,
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassio://localhost/long-lived-access-token",
|
||||
{
|
||||
"instance": NotifyHomeAssistant,
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassio://user:pass@localhost/long-lived-access-token/",
|
||||
{
|
||||
"instance": NotifyHomeAssistant,
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
"privacy_url": "hassio://user:****@localhost/l...n",
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassio://localhost:80/long-lived-access-token",
|
||||
{
|
||||
"instance": NotifyHomeAssistant,
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassio://user@localhost:8123/llat",
|
||||
{
|
||||
"instance": NotifyHomeAssistant,
|
||||
"privacy_url": "hassio://user@localhost/l...t",
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassios://localhost/llat?nid=!%",
|
||||
{
|
||||
# Invalid notification_id
|
||||
"instance": TypeError,
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassios://localhost/llat?nid=abcd",
|
||||
{
|
||||
# Valid notification_id
|
||||
"instance": NotifyHomeAssistant,
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassios://user:pass@localhost/llat",
|
||||
{
|
||||
"instance": NotifyHomeAssistant,
|
||||
"privacy_url": "hassios://user:****@localhost/l...t",
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassios://localhost:8443/path/llat/",
|
||||
{
|
||||
"instance": NotifyHomeAssistant,
|
||||
"privacy_url": "hassios://localhost:8443/path/l...t",
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassio://localhost:8123/a/path?accesstoken=llat",
|
||||
{
|
||||
"instance": NotifyHomeAssistant,
|
||||
# Default port; so it's stripped off
|
||||
# accesstoken was specified as kwarg
|
||||
"privacy_url": "hassio://localhost/a/path/l...t",
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassios://user:password@localhost:80/llat/",
|
||||
{
|
||||
"instance": NotifyHomeAssistant,
|
||||
"privacy_url": "hassios://user:****@localhost:80",
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassio://user:pass@localhost:8123/llat",
|
||||
{
|
||||
"instance": NotifyHomeAssistant,
|
||||
# force a failure
|
||||
"response": False,
|
||||
"requests_response_code": requests.codes.internal_server_error,
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassio://user:pass@localhost/llat",
|
||||
{
|
||||
"instance": NotifyHomeAssistant,
|
||||
# throw a bizzare code forcing us to fail to look it up
|
||||
"response": False,
|
||||
"requests_response_code": 999,
|
||||
},
|
||||
),
|
||||
(
|
||||
"hassio://user:pass@localhost/llat",
|
||||
{
|
||||
"instance": NotifyHomeAssistant,
|
||||
# Throws a series of i/o exceptions with this flag
|
||||
# is set and tests that we gracfully handle them
|
||||
"test_requests_exceptions": True,
|
||||
},
|
||||
),
|
||||
("hassio://user@localhost", {
|
||||
"instance": TypeError,
|
||||
}),
|
||||
("hassio://localhost/long.lived.token", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
}),
|
||||
("hassio://localhost/prefix/path/long.lived.token", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
}),
|
||||
("hassio://localhost/long.lived.token?prefix=/ha", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
}),
|
||||
("hassio://localhost/service/?token=long.lived.token&prefix=/ha", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
}),
|
||||
("hassio://localhost/?token=long.lived.token&prefix=/ha&to=service", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
}),
|
||||
("hassio://localhost/service/$%/?token=long.lived.token&prefix=/ha", {
|
||||
# Tests an invalid service entry
|
||||
"instance": NotifyHomeAssistant,
|
||||
}),
|
||||
("hassio://localhost/%only%/%invalid%/?token=lng.lived.token&prefix=/ha", {
|
||||
# Tests an invalid service entry
|
||||
"instance": NotifyHomeAssistant,
|
||||
# we'll have a notify response failure in this case
|
||||
"notify_response": False,
|
||||
}),
|
||||
("hassio://user:pass@localhost/long.lived.token/", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
"privacy_url": "hassio://user:****@localhost/l...n",
|
||||
}),
|
||||
("hassio://localhost:80/long.lived.token", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
}),
|
||||
("hassio://user@localhost:8123/long.lived.token", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
"privacy_url": "hassio://user@localhost/l...n",
|
||||
}),
|
||||
("hassios://localhost/long.lived.token?nid=!%", {
|
||||
# Invalid notification_id
|
||||
"instance": TypeError,
|
||||
}),
|
||||
("hassios://localhost/long.lived.token?nid=abcd", {
|
||||
# Valid notification_id
|
||||
"instance": NotifyHomeAssistant,
|
||||
}),
|
||||
("hassios://user:pass@localhost/long.lived.token", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
"privacy_url": "hassios://user:****@localhost/l...n",
|
||||
}),
|
||||
("hassios://localhost:8443/path/long.lived.token/", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
"privacy_url": "hassios://localhost:8443/l...n",
|
||||
}),
|
||||
("hassio://localhost:8123/a/path?token=long.lived.token", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
# Default port; so it's stripped off
|
||||
# token was specified as kwarg
|
||||
"privacy_url": "hassio://localhost/l...n",
|
||||
}),
|
||||
("hassios://user:password@localhost:80/long.lived.token/", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
|
||||
"privacy_url": "hassios://user:****@localhost:80",
|
||||
}),
|
||||
("hassio://user:pass@localhost:8123/long.lived.token", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
# force a failure
|
||||
"response": False,
|
||||
"requests_response_code": requests.codes.internal_server_error,
|
||||
}),
|
||||
("hassio://user:pass@localhost/long.lived.token", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
# throw a bizzare code forcing us to fail to look it up
|
||||
"response": False,
|
||||
"requests_response_code": 999,
|
||||
}),
|
||||
("hassio://user:pass@localhost/long.lived.token", {
|
||||
"instance": NotifyHomeAssistant,
|
||||
# Throws a series of connection and transfer exceptions when this flag
|
||||
# is set and tests that we gracfully handle them
|
||||
"test_requests_exceptions": True,
|
||||
}),
|
||||
)
|
||||
|
||||
|
||||
def test_plugin_homeassistant_urls():
|
||||
"""NotifyHomeAssistant() Apprise URLs."""
|
||||
"""
|
||||
NotifyHomeAssistant() Apprise URLs
|
||||
|
||||
"""
|
||||
|
||||
# Run our general tests
|
||||
AppriseURLTester(tests=apprise_url_tests).run_all()
|
||||
|
@ -174,7 +152,10 @@ def test_plugin_homeassistant_urls():
|
|||
|
||||
@mock.patch("requests.post")
|
||||
def test_plugin_homeassistant_general(mock_post):
|
||||
"""NotifyHomeAssistant() General Checks."""
|
||||
"""
|
||||
NotifyHomeAssistant() General Checks
|
||||
|
||||
"""
|
||||
|
||||
response = mock.Mock()
|
||||
response.content = ""
|
||||
|
@ -183,8 +164,8 @@ def test_plugin_homeassistant_general(mock_post):
|
|||
# Prepare Mock
|
||||
mock_post.return_value = response
|
||||
|
||||
# Variation Initializations
|
||||
obj = Apprise.instantiate("hassio://localhost/accesstoken")
|
||||
# Initializations
|
||||
obj = Apprise.instantiate("hassio://localhost/long.lived.token")
|
||||
assert isinstance(obj, NotifyHomeAssistant) is True
|
||||
assert isinstance(obj.url(), str) is True
|
||||
|
||||
|
@ -192,7 +173,174 @@ def test_plugin_homeassistant_general(mock_post):
|
|||
assert obj.send(body="test") is True
|
||||
|
||||
assert mock_post.call_count == 1
|
||||
assert (
|
||||
mock_post.call_args_list[0][0][0]
|
||||
== "http://localhost:8123/api/services/persistent_notification/create"
|
||||
)
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
"http://localhost:8123/api/services/persistent_notification/create"
|
||||
|
||||
# Reset our mock object
|
||||
mock_post.reset_mock()
|
||||
|
||||
# Now let's notify an object
|
||||
obj = Apprise.instantiate(
|
||||
"hassio://localhost/long.lived.token/service")
|
||||
assert isinstance(obj, NotifyHomeAssistant) is True
|
||||
assert isinstance(obj.url(), str) is True
|
||||
|
||||
# Send Notification
|
||||
assert obj.send(body="test") is True
|
||||
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
"http://localhost:8123/api/services/notify/service"
|
||||
posted_json = json.loads(mock_post.call_args_list[0][1]["data"])
|
||||
assert "notification_id" in posted_json
|
||||
assert "targets" not in posted_json
|
||||
assert "message" in posted_json
|
||||
assert posted_json["message"] == "test"
|
||||
assert "title" in posted_json
|
||||
assert posted_json["title"] == ""
|
||||
|
||||
# Reset our mock object
|
||||
mock_post.reset_mock()
|
||||
|
||||
#
|
||||
# No Batch Processing
|
||||
#
|
||||
|
||||
# Now let's notify an object
|
||||
obj = Apprise.instantiate(
|
||||
"hassio://localhost/long.lived.token/serviceA:target1,target2/"
|
||||
"service2/domain1.service3?batch=no")
|
||||
assert isinstance(obj, NotifyHomeAssistant) is True
|
||||
assert isinstance(obj.url(), str) is True
|
||||
|
||||
# Send Notification
|
||||
assert obj.send(body="test-body", title="title") is True
|
||||
|
||||
# Entries are split apart
|
||||
assert len(obj) == 4
|
||||
assert mock_post.call_count == 4
|
||||
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
"http://localhost:8123/api/services/domain1/service3"
|
||||
posted_json = json.loads(mock_post.call_args_list[0][1]["data"])
|
||||
assert "notification_id" in posted_json
|
||||
assert "targets" not in posted_json
|
||||
assert "message" in posted_json
|
||||
assert posted_json["message"] == "test-body"
|
||||
assert "title" in posted_json
|
||||
assert posted_json["title"] == "title"
|
||||
|
||||
assert mock_post.call_args_list[1][0][0] == \
|
||||
"http://localhost:8123/api/services/notify/service2"
|
||||
posted_json = json.loads(mock_post.call_args_list[1][1]["data"])
|
||||
assert "notification_id" in posted_json
|
||||
assert "targets" not in posted_json
|
||||
assert "message" in posted_json
|
||||
assert posted_json["message"] == "test-body"
|
||||
assert "title" in posted_json
|
||||
assert posted_json["title"] == "title"
|
||||
|
||||
assert mock_post.call_args_list[2][0][0] == \
|
||||
"http://localhost:8123/api/services/notify/serviceA"
|
||||
posted_json = json.loads(mock_post.call_args_list[2][1]["data"])
|
||||
assert "notification_id" in posted_json
|
||||
assert "targets" in posted_json
|
||||
assert isinstance(posted_json["targets"], list)
|
||||
assert len(posted_json["targets"]) == 1
|
||||
assert "target1" in posted_json["targets"]
|
||||
assert "message" in posted_json
|
||||
assert posted_json["message"] == "test-body"
|
||||
assert "title" in posted_json
|
||||
assert posted_json["title"] == "title"
|
||||
|
||||
assert mock_post.call_args_list[3][0][0] == \
|
||||
"http://localhost:8123/api/services/notify/serviceA"
|
||||
posted_json = json.loads(mock_post.call_args_list[3][1]["data"])
|
||||
assert "notification_id" in posted_json
|
||||
assert "targets" in posted_json
|
||||
assert isinstance(posted_json["targets"], list)
|
||||
assert len(posted_json["targets"]) == 1
|
||||
assert "target2" in posted_json["targets"]
|
||||
assert "message" in posted_json
|
||||
assert posted_json["message"] == "test-body"
|
||||
assert "title" in posted_json
|
||||
assert posted_json["title"] == "title"
|
||||
|
||||
# Reset our mock object
|
||||
mock_post.reset_mock()
|
||||
|
||||
#
|
||||
# Batch Processing
|
||||
#
|
||||
|
||||
# Now let's notify an object
|
||||
obj = Apprise.instantiate(
|
||||
"hassio://localhost/long.lived.token/serviceA:target1,target2/"
|
||||
"service2/domain1.service3?batch=yes")
|
||||
assert isinstance(obj, NotifyHomeAssistant) is True
|
||||
assert isinstance(obj.url(), str) is True
|
||||
|
||||
# Send Notification
|
||||
assert obj.send(body="test-body", title="title") is True
|
||||
|
||||
# Entries targets can be grouped
|
||||
assert len(obj) == 3
|
||||
assert mock_post.call_count == 3
|
||||
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
"http://localhost:8123/api/services/domain1/service3"
|
||||
posted_json = json.loads(mock_post.call_args_list[0][1]["data"])
|
||||
assert "notification_id" in posted_json
|
||||
assert "targets" not in posted_json
|
||||
assert "message" in posted_json
|
||||
assert posted_json["message"] == "test-body"
|
||||
assert "title" in posted_json
|
||||
assert posted_json["title"] == "title"
|
||||
|
||||
assert mock_post.call_args_list[1][0][0] == \
|
||||
"http://localhost:8123/api/services/notify/service2"
|
||||
posted_json = json.loads(mock_post.call_args_list[1][1]["data"])
|
||||
assert "notification_id" in posted_json
|
||||
assert "targets" not in posted_json
|
||||
assert "message" in posted_json
|
||||
assert posted_json["message"] == "test-body"
|
||||
assert "title" in posted_json
|
||||
assert posted_json["title"] == "title"
|
||||
|
||||
assert mock_post.call_args_list[2][0][0] == \
|
||||
"http://localhost:8123/api/services/notify/serviceA"
|
||||
posted_json = json.loads(mock_post.call_args_list[2][1]["data"])
|
||||
assert "notification_id" in posted_json
|
||||
assert "targets" in posted_json
|
||||
assert isinstance(posted_json["targets"], list)
|
||||
# Our batch groups our targets
|
||||
assert len(posted_json["targets"]) == 2
|
||||
assert "target1" in posted_json["targets"]
|
||||
assert "target2" in posted_json["targets"]
|
||||
assert "message" in posted_json
|
||||
assert posted_json["message"] == "test-body"
|
||||
assert "title" in posted_json
|
||||
assert posted_json["title"] == "title"
|
||||
|
||||
# Reset our mock object
|
||||
mock_post.reset_mock()
|
||||
|
||||
#
|
||||
# Test error handling on multi-query request
|
||||
#
|
||||
|
||||
# Now let's notify an object
|
||||
obj = Apprise.instantiate(
|
||||
"hassio://localhost/long.lived.token/serviceA:target1,target2/"
|
||||
"service2:target3,target4,target5,target6?batch=no")
|
||||
|
||||
assert isinstance(obj, NotifyHomeAssistant) is True
|
||||
assert isinstance(obj.url(), str) is True
|
||||
|
||||
bad_response = mock.Mock()
|
||||
bad_response.content = ""
|
||||
bad_response.status_code = requests.codes.not_found
|
||||
|
||||
mock_post.side_effect = (response, bad_response)
|
||||
|
||||
# We will fail on our second message sent
|
||||
assert obj.send(body="test-body", title="title") is False
|
||||
|
|
Loading…
Reference in New Issue