added service.url(), apprise.urls(), iter support and pop()

pull/74/head
Chris Caron 2019-02-13 17:48:09 -05:00
parent b9be82c1b2
commit 0d56da9ac8
40 changed files with 1087 additions and 280 deletions

View File

@ -39,7 +39,7 @@ The table below identifies the services this tool supports and some example serv
| [Faast](https://github.com/caronc/apprise/wiki/Notify_faast) | faast:// | (TCP) 443 | faast://authorizationtoken | [Faast](https://github.com/caronc/apprise/wiki/Notify_faast) | faast:// | (TCP) 443 | faast://authorizationtoken
| [Gnome](https://github.com/caronc/apprise/wiki/Notify_gnome) | gnome:// | n/a | gnome:// | [Gnome](https://github.com/caronc/apprise/wiki/Notify_gnome) | gnome:// | n/a | gnome://
| [Growl](https://github.com/caronc/apprise/wiki/Notify_growl) | growl:// | (UDP) 23053 | growl://hostname<br />growl://hostname:portno<br />growl://password@hostname<br />growl://password@hostname:port</br>**Note**: you can also use the get parameter _version_ which can allow the growl request to behave using the older v1.x protocol. An example would look like: growl://hostname?version=1 | [Growl](https://github.com/caronc/apprise/wiki/Notify_growl) | growl:// | (UDP) 23053 | growl://hostname<br />growl://hostname:portno<br />growl://password@hostname<br />growl://password@hostname:port</br>**Note**: you can also use the get parameter _version_ which can allow the growl request to behave using the older v1.x protocol. An example would look like: growl://hostname?version=1
| [IFTTT](https://github.com/caronc/apprise/wiki/Notify_ifttt) | ifttt:// | (TCP) 443 | ifttt://webhooksID/EventToTrigger<br />ifttt://webhooksID/EventToTrigger/Value1/Value2/Value3<br />ifttt://webhooksID/EventToTrigger/?Value3=NewEntry&Value2=AnotherValue | [IFTTT](https://github.com/caronc/apprise/wiki/Notify_ifttt) | ifttt:// | (TCP) 443 | ifttt://webhooksID/EventToTrigger<br />ifttt://webhooksID/EventToTrigger/Event2/EventN
| [Join](https://github.com/caronc/apprise/wiki/Notify_join) | join:// | (TCP) 443 | join://apikey/device<br />join://apikey/device1/device2/deviceN/<br />join://apikey/group<br />join://apikey/groupA/groupB/groupN<br />join://apikey/DeviceA/groupA/groupN/DeviceN/ | [Join](https://github.com/caronc/apprise/wiki/Notify_join) | join:// | (TCP) 443 | join://apikey/device<br />join://apikey/device1/device2/deviceN/<br />join://apikey/group<br />join://apikey/groupA/groupB/groupN<br />join://apikey/DeviceA/groupA/groupN/DeviceN/
| [KODI](https://github.com/caronc/apprise/wiki/Notify_kodi) | kodi:// or kodis:// | (TCP) 8080 or 443 | kodi://hostname<br />kodi://user@hostname<br />kodi://user:password@hostname:port | [KODI](https://github.com/caronc/apprise/wiki/Notify_kodi) | kodi:// or kodis:// | (TCP) 8080 or 443 | kodi://hostname<br />kodi://user@hostname<br />kodi://user:password@hostname:port
| [Matrix](https://github.com/caronc/apprise/wiki/Notify_matrix) | matrix:// or matrixs:// | (TCP) 80 or 443 | matrix://token<br />matrix://user@token<br />matrixs://token?mode=slack<br />matrixs://user@token | [Matrix](https://github.com/caronc/apprise/wiki/Notify_matrix) | matrix:// or matrixs:// | (TCP) 80 or 443 | matrix://token<br />matrix://user@token<br />matrixs://token?mode=slack<br />matrixs://user@token

View File

@ -171,7 +171,7 @@ class Apprise(object):
# URL information # URL information
plugin = SCHEMA_MAP[results['schema']](**results) plugin = SCHEMA_MAP[results['schema']](**results)
except: except Exception:
# the arguments are invalid or can not be used. # the arguments are invalid or can not be used.
logger.error('Could not load URL: %s' % url) logger.error('Could not load URL: %s' % url)
return None return None
@ -432,6 +432,33 @@ class Apprise(object):
return response return response
def urls(self):
"""
Returns all of the loaded URLs defined in this apprise object.
"""
return [x.url() for x in self.servers]
def pop(self, index):
"""
Removes an indexed Notification Service from the stack and
returns it.
"""
# Remove our entry
return self.servers.pop(index)
def __getitem__(self, index):
"""
Returns the indexed server entry of a loaded notification server
"""
return self.servers[index]
def __iter__(self):
"""
Returns an iterator to our server list
"""
return iter(self.servers)
def __len__(self): def __len__(self):
""" """
Returns the number of servers loaded Returns the number of servers loaded

View File

@ -260,6 +260,14 @@ class NotifyBase(object):
color_type=color_type, color_type=color_type,
) )
def url(self):
"""
Assembles the URL associated with the notification based on the
arguments provied.
"""
raise NotImplementedError("This is implimented by the child class.")
def __contains__(self, tags): def __contains__(self, tags):
""" """
Returns true if the tag specified is associated with this notification. Returns true if the tag specified is associated with this notification.
@ -347,16 +355,20 @@ class NotifyBase(object):
""" """
common urlencode function common urlencode function
The query should always be a dictionary.
""" """
# Tidy query by eliminating any records set to None
_query = {k: v for (k, v) in query.items() if v is not None}
try: try:
# Python v3.x # Python v3.x
return _urlencode( return _urlencode(
query, doseq=doseq, safe=safe, encoding=encoding, _query, doseq=doseq, safe=safe, encoding=encoding,
errors=errors) errors=errors)
except TypeError: except TypeError:
# Python v2.7 # Python v2.7
return _urlencode(query) return _urlencode(_query)
@staticmethod @staticmethod
def split_path(path, unquote=True): def split_path(path, unquote=True):

View File

@ -29,6 +29,7 @@ import re
from time import time from time import time
import hmac import hmac
from hashlib import sha1 from hashlib import sha1
from itertools import chain
try: try:
from urlparse import urlparse from urlparse import urlparse
@ -272,6 +273,26 @@ class NotifyBoxcar(NotifyBase):
return True return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format
}
return '{schema}://{access}/{secret}/{recipients}/?{args}'.format(
schema=self.secure_protocol,
access=self.quote(self.access),
secret=self.quote(self.secret),
recipients='/'.join([
self.quote(x) for x in chain(
self.tags, self.device_tokens) if x != DEFAULT_TAG]),
args=self.urlencode(args),
)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -287,6 +287,13 @@ class NotifyDBus(NotifyBase):
return True return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
return '{schema}://'.format(schema=self.schema)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -180,8 +180,8 @@ class NotifyDiscord(NotifyBase):
else: else:
# not markdown # not markdown
payload['content'] = body if not title \ payload['content'] = \
else "{}\r\n{}".format(title, body) body if not title else "{}\r\n{}".format(title, body)
if self.avatar and image_url: if self.avatar and image_url:
payload['avatar_url'] = image_url payload['avatar_url'] = image_url
@ -241,6 +241,27 @@ class NotifyDiscord(NotifyBase):
return True return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'tts': 'yes' if self.tts else 'no',
'avatar': 'yes' if self.avatar else 'no',
'footer': 'yes' if self.footer else 'no',
'thumbnail': 'yes' if self.thumbnail else 'no',
}
return '{schema}://{webhook_id}/{webhook_token}/?{args}'.format(
schema=self.secure_protocol,
webhook_id=self.quote(self.webhook_id),
webhook_token=self.quote(self.webhook_token),
args=self.urlencode(args),
)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -421,6 +421,52 @@ class NotifyEmail(NotifyBase):
return True return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'to': self.to_addr,
'from': self.from_addr,
'name': self.from_name,
'mode': self.secure_mode,
'smtp': self.smtp_host,
'timeout': self.timeout,
'user': self.user,
}
# pull email suffix from username (if present)
user = self.user.split('@')[0]
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=self.quote(user, safe=''),
password=self.quote(self.password, safe=''),
)
else:
# user url
auth = '{user}@'.format(
user=self.quote(user, safe=''),
)
# Default Port setup
default_port = \
self.default_secure_port if self.secure else self.default_port
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -535,6 +535,38 @@ class NotifyEmby(NotifyBase):
return not has_error return not has_error
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'modal': 'yes' if self.modal else 'no',
}
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=self.quote(self.user, safe=''),
password=self.quote(self.password, safe=''),
)
else: # self.user is set
auth = '{user}@'.format(
user=self.quote(self.user, safe=''),
)
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
port='' if self.port is None or self.port == self.emby_default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@property @property
def is_authenticated(self): def is_authenticated(self):
""" """

View File

@ -124,6 +124,22 @@ class NotifyFaast(NotifyBase):
return True return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
}
return '{schema}://{authtoken}/?{args}'.format(
schema=self.protocol,
authtoken=self.quote(self.authtoken, safe=''),
args=self.urlencode(args),
)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -164,6 +164,13 @@ class NotifyGnome(NotifyBase):
return True return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
return '{schema}://'.format(schema=self.protocol)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -207,6 +207,43 @@ class NotifyGrowl(NotifyBase):
return True return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
_map = {
GrowlPriority.LOW: 'low',
GrowlPriority.MODERATE: 'moderate',
GrowlPriority.NORMAL: 'normal',
GrowlPriority.HIGH: 'high',
GrowlPriority.EMERGENCY: 'emergency',
}
# Define any arguments set
args = {
'format': self.notify_format,
'priority':
_map[GrowlPriority.NORMAL] if self.priority not in _map
else _map[self.priority],
'version': self.version,
}
auth = ''
if self.password:
auth = '{password}@'.format(
password=self.quote(self.user, safe=''),
)
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
port='' if self.port is None or self.port == self.default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """
@ -239,15 +276,10 @@ class NotifyGrowl(NotifyBase):
if 'priority' in results['qsd'] and len(results['qsd']['priority']): if 'priority' in results['qsd'] and len(results['qsd']['priority']):
_map = { _map = {
'l': GrowlPriority.LOW, 'l': GrowlPriority.LOW,
'-2': GrowlPriority.LOW,
'm': GrowlPriority.MODERATE, 'm': GrowlPriority.MODERATE,
'-1': GrowlPriority.MODERATE,
'n': GrowlPriority.NORMAL, 'n': GrowlPriority.NORMAL,
'0': GrowlPriority.NORMAL,
'h': GrowlPriority.HIGH, 'h': GrowlPriority.HIGH,
'1': GrowlPriority.HIGH,
'e': GrowlPriority.EMERGENCY, 'e': GrowlPriority.EMERGENCY,
'2': GrowlPriority.EMERGENCY,
} }
try: try:
results['priority'] = \ results['priority'] = \

View File

@ -34,7 +34,7 @@
# URL. For example, it might look like this: # URL. For example, it might look like this:
# https://maker.ifttt.com/use/a3nHB7gA9TfBQSqJAHklod # https://maker.ifttt.com/use/a3nHB7gA9TfBQSqJAHklod
# #
# In the above example a3nHB7gA9TfBQSqJAHklod becomes your {apikey} # In the above example a3nHB7gA9TfBQSqJAHklod becomes your {webhook_id}
# You will need this to make this notification work correctly # You will need this to make this notification work correctly
# #
# For each event you create you will assign it a name (this will be known as # For each event you create you will assign it a name (this will be known as
@ -44,6 +44,7 @@ import requests
from json import dumps from json import dumps
from .NotifyBase import NotifyBase from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP from .NotifyBase import HTTP_ERROR_MAP
from ..utils import parse_list
class NotifyIFTTT(NotifyBase): class NotifyIFTTT(NotifyBase):
@ -59,7 +60,7 @@ class NotifyIFTTT(NotifyBase):
service_url = 'https://ifttt.com/' service_url = 'https://ifttt.com/'
# The default protocol # The default protocol
protocol = 'ifttt' secure_protocol = 'ifttt'
# A URL that takes you to the setup/help of the specific protocol # A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_ifttt' setup_url = 'https://github.com/caronc/apprise/wiki/Notify_ifttt'
@ -87,35 +88,28 @@ class NotifyIFTTT(NotifyBase):
ifttt_default_type_key = 'value3' ifttt_default_type_key = 'value3'
# IFTTT uses the http protocol with JSON requests # IFTTT uses the http protocol with JSON requests
notify_url = 'https://maker.ifttt.com/trigger/{event}/with/key/{apikey}' notify_url = 'https://maker.ifttt.com/' \
'trigger/{event}/with/key/{webhook_id}'
def __init__(self, apikey, event, event_args=None, **kwargs): def __init__(self, webhook_id, events, **kwargs):
""" """
Initialize IFTTT Object Initialize IFTTT Object
""" """
super(NotifyIFTTT, self).__init__(**kwargs) super(NotifyIFTTT, self).__init__(**kwargs)
if not apikey: if not webhook_id:
raise TypeError('You must specify the Webhooks apikey.') raise TypeError('You must specify the Webhooks webhook_id.')
if not event: # Store our Events we wish to trigger
raise TypeError('You must specify the Event you wish to trigger.') self.events = parse_list(events)
if not self.events:
raise TypeError(
'You must specify at least one event you wish to trigger on.')
# Store our APIKey # Store our APIKey
self.apikey = apikey self.webhook_id = webhook_id
# Store our Event we wish to trigger
self.event = event
if isinstance(event_args, dict):
# Make a copy of the arguments so that they can't change
# outside of this plugin
self.event_args = event_args.copy()
else:
# Force a dictionary
self.event_args = dict()
def notify(self, title, body, notify_type, **kwargs): def notify(self, title, body, notify_type, **kwargs):
""" """
@ -134,72 +128,101 @@ class NotifyIFTTT(NotifyBase):
self.ifttt_default_type_key: notify_type, self.ifttt_default_type_key: notify_type,
} }
# Update our payload using any other event_args specified
payload.update(self.event_args)
# Eliminate empty fields; users wishing to cancel the use of the # Eliminate empty fields; users wishing to cancel the use of the
# self.ifttt_default_ entries can preset these keys to being # self.ifttt_default_ entries can preset these keys to being
# empty so that they get caught here and removed. # empty so that they get caught here and removed.
payload = {x: y for x, y in payload.items() if y} payload = {x: y for x, y in payload.items() if y}
# URL to transmit content via # Track our failures
url = self.notify_url.format( error_count = 0
apikey=self.apikey,
event=self.event, # Create a copy of our event lit
events = list(self.events)
while len(events):
# Retrive an entry off of our event list
event = events.pop(0)
# URL to transmit content via
url = self.notify_url.format(
webhook_id=self.webhook_id,
event=event,
)
self.logger.debug('IFTTT POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
))
self.logger.debug('IFTTT Payload: %s' % str(payload))
try:
r = requests.post(
url,
data=dumps(payload),
headers=headers,
verify=self.verify_certificate,
)
self.logger.debug(
u"IFTTT HTTP response status: %r" % r.status_code)
self.logger.debug(
u"IFTTT HTTP response headers: %r" % r.headers)
self.logger.debug(
u"IFTTT HTTP response body: %r" % r.content)
if r.status_code != requests.codes.ok:
# We had a problem
try:
self.logger.warning(
'Failed to send IFTTT:%s '
'notification: %s (error=%s).' % (
event,
HTTP_ERROR_MAP[r.status_code],
r.status_code))
except KeyError:
self.logger.warning(
'Failed to send IFTTT:%s '
'notification (error=%s).' % (
event, r.status_code))
# self.logger.debug('Response Details: %s' % r.content)
error_count += 1
else:
self.logger.info(
'Sent IFTTT notification to Event %s.' % event)
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending IFTTT:%s ' % (
event) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
error_count += 1
finally:
if len(events):
# Prevent thrashing requests
self.throttle()
return (error_count == 0)
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
}
return '{schema}://{webhook_id}@{events}/?{args}'.format(
schema=self.secure_protocol,
webhook_id=self.webhook_id,
events='/'.join([self.quote(x, safe='') for x in self.events]),
args=self.urlencode(args),
) )
self.logger.debug('IFTTT POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
))
self.logger.debug('IFTTT Payload: %s' % str(payload))
try:
r = requests.post(
url,
data=dumps(payload),
headers=headers,
verify=self.verify_certificate,
)
self.logger.debug(
u"IFTTT HTTP response status: %r" % r.status_code)
self.logger.debug(
u"IFTTT HTTP response headers: %r" % r.headers)
self.logger.debug(
u"IFTTT HTTP response body: %r" % r.content)
if r.status_code != requests.codes.ok:
# We had a problem
try:
self.logger.warning(
'Failed to send IFTTT:%s '
'notification: %s (error=%s).' % (
self.event,
HTTP_ERROR_MAP[r.status_code],
r.status_code))
except KeyError:
self.logger.warning(
'Failed to send IFTTT:%s '
'notification (error=%s).' % (
self.event,
r.status_code))
# self.logger.debug('Response Details: %s' % r.content)
return False
else:
self.logger.info(
'Sent IFTTT notification to Event %s.' % self.event)
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending IFTTT:%s ' % (
self.event) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
return False
return True
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """
@ -214,22 +237,14 @@ class NotifyIFTTT(NotifyBase):
return results return results
# Our Event # Our Event
results['event'] = results['host'] results['events'] = list()
results['events'].append(results['host'])
# Our API Key # Our API Key
results['apikey'] = results['user'] results['webhook_id'] = results['user']
# Store ValueX entries based on each entry past the host # Now fetch the remaining tokens
results['event_args'] = { results['events'].extend([x for x in filter(
'{0}{1}'.format(NotifyIFTTT.ifttt_default_key_prefix, n + 1): bool, NotifyBase.split_path(results['fullpath']))][0:])
NotifyBase.unquote(x)
for n, x in enumerate(
NotifyBase.split_path(results['fullpath'])) if x}
# Allow users to set key=val parameters to specify more types
# of payload options
results['event_args'].update(
{k: NotifyBase.unquote(v)
for k, v in results['qsd'].items()})
return results return results

View File

@ -70,6 +70,39 @@ class NotifyJSON(NotifyBase):
return return
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
}
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=self.quote(self.user, safe=''),
password=self.quote(self.password, safe=''),
)
elif self.user:
auth = '{user}@'.format(
user=self.quote(self.user, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
def notify(self, title, body, notify_type, **kwargs): def notify(self, title, body, notify_type, **kwargs):
""" """
Perform JSON Notification Perform JSON Notification

View File

@ -78,7 +78,7 @@ class NotifyJoin(NotifyBase):
service_url = 'https://joaoapps.com/join/' service_url = 'https://joaoapps.com/join/'
# The default protocol # The default protocol
protocol = 'join' secure_protocol = 'join'
# A URL that takes you to the setup/help of the specific protocol # A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_join' setup_url = 'https://github.com/caronc/apprise/wiki/Notify_join'
@ -233,6 +233,22 @@ class NotifyJoin(NotifyBase):
return return_status return return_status
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
}
return '{schema}://{apikey}/{devices}/?{args}'.format(
schema=self.secure_protocol,
apikey=self.quote(self.apikey, safe=''),
devices='/'.join([self.quote(x) for x in self.devices]),
args=self.urlencode(args))
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -112,7 +112,6 @@ class NotifyMatrix(NotifyBase):
if not self.user: if not self.user:
self.logger.warning( self.logger.warning(
'No user was specified; using %s.' % MATRIX_DEFAULT_USER) 'No user was specified; using %s.' % MATRIX_DEFAULT_USER)
self.user = MATRIX_DEFAULT_USER
if mode not in MATRIX_NOTIFICATION_MODES: if mode not in MATRIX_NOTIFICATION_MODES:
self.logger.warning('The mode specified (%s) is invalid.' % mode) self.logger.warning('The mode specified (%s) is invalid.' % mode)
@ -209,7 +208,7 @@ class NotifyMatrix(NotifyBase):
def __slack_mode_payload(self, title, body, notify_type): def __slack_mode_payload(self, title, body, notify_type):
# prepare JSON Object # prepare JSON Object
payload = { payload = {
'username': self.user, 'username': self.user if self.user else MATRIX_DEFAULT_USER,
# Use Markdown language # Use Markdown language
'mrkdwn': True, 'mrkdwn': True,
'attachments': [{ 'attachments': [{
@ -230,13 +229,43 @@ class NotifyMatrix(NotifyBase):
msg = '<h4>%s</h4>%s<br/>' % (title, body) msg = '<h4>%s</h4>%s<br/>' % (title, body)
payload = { payload = {
'displayName': self.user, 'displayName': self.user if self.user else MATRIX_DEFAULT_USER,
'format': 'html', 'format': 'html',
'text': msg, 'text': msg,
} }
return payload return payload
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'mode': self.mode,
}
# Determine Authentication
auth = ''
if self.user:
auth = '{user}@'.format(
user=self.quote(self.user, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{host}/{token}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
host=self.host,
auth=auth,
token=self.token,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -179,6 +179,28 @@ class NotifyMatterMost(NotifyBase):
return True return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
}
default_port = 443 if self.secure else self.default_port
default_schema = self.secure_protocol if self.secure else self.protocol
return '{schema}://{hostname}{port}/{authtoken}/?{args}'.format(
schema=default_schema,
hostname=self.host,
port='' if not self.port or self.port == default_port
else ':{}'.format(self.port),
authtoken=self.quote(self.authtoken, safe=''),
args=self.urlencode(args),
)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -190,6 +190,34 @@ class NotifyProwl(NotifyBase):
return True return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
_map = {
ProwlPriority.LOW: 'low',
ProwlPriority.MODERATE: 'moderate',
ProwlPriority.NORMAL: 'normal',
ProwlPriority.HIGH: 'high',
ProwlPriority.EMERGENCY: 'emergency',
}
# Define any arguments set
args = {
'format': self.notify_format,
'priority': 'normal' if self.priority not in _map
else _map[self.priority]
}
return '{schema}://{apikey}/{providerkey}/?{args}'.format(
schema=self.secure_protocol,
apikey=self.quote(self.apikey, safe=''),
providerkey='' if not self.providerkey
else self.quote(self.providerkey, safe=''),
args=self.urlencode(args),
)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """
@ -216,15 +244,10 @@ class NotifyProwl(NotifyBase):
if 'priority' in results['qsd'] and len(results['qsd']['priority']): if 'priority' in results['qsd'] and len(results['qsd']['priority']):
_map = { _map = {
'l': ProwlPriority.LOW, 'l': ProwlPriority.LOW,
'-2': ProwlPriority.LOW,
'm': ProwlPriority.MODERATE, 'm': ProwlPriority.MODERATE,
'-1': ProwlPriority.MODERATE,
'n': ProwlPriority.NORMAL, 'n': ProwlPriority.NORMAL,
'0': ProwlPriority.NORMAL,
'h': ProwlPriority.HIGH, 'h': ProwlPriority.HIGH,
'1': ProwlPriority.HIGH,
'e': ProwlPriority.EMERGENCY, 'e': ProwlPriority.EMERGENCY,
'2': ProwlPriority.EMERGENCY,
} }
try: try:
results['priority'] = \ results['priority'] = \

View File

@ -182,6 +182,28 @@ class NotifyPushBullet(NotifyBase):
return not has_error return not has_error
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
}
recipients = '/'.join([self.quote(x) for x in self.recipients])
if recipients == PUSHBULLET_SEND_TO_ALL:
# keyword is reserved for internal usage only; it's safe to remove
# it from the recipients list
recipients = ''
return '{schema}://{accesstoken}/{recipients}/?{args}'.format(
schema=self.secure_protocol,
accesstoken=self.quote(self.accesstoken, safe=''),
recipients=recipients,
args=self.urlencode(args))
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -26,6 +26,7 @@
import re import re
import requests import requests
from json import dumps from json import dumps
from itertools import chain
from .NotifyBase import NotifyBase from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP from .NotifyBase import HTTP_ERROR_MAP
@ -256,6 +257,29 @@ class NotifyPushed(NotifyBase):
return True return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
}
return '{schema}://{app_key}/{app_secret}/{targets}/?{args}'.format(
schema=self.secure_protocol,
app_key=self.quote(self.app_key, safe=''),
app_secret=self.quote(self.app_secret, safe=''),
targets='/'.join(
[self.quote(x) for x in chain(
# Channels are prefixed with a pound/hashtag symbol
['#{}'.format(x) for x in self.channels],
# Users are prefixed with an @ symbol
['@{}'.format(x) for x in self.users],
)]),
args=self.urlencode(args))
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -52,12 +52,15 @@ class NotifyPushjet(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol # A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_pushjet' setup_url = 'https://github.com/caronc/apprise/wiki/Notify_pushjet'
def __init__(self, **kwargs): def __init__(self, secret_key, **kwargs):
""" """
Initialize Pushjet Object Initialize Pushjet Object
""" """
super(NotifyPushjet, self).__init__(**kwargs) super(NotifyPushjet, self).__init__(**kwargs)
# store our key
self.secret_key = secret_key
def notify(self, title, body, notify_type): def notify(self, title, body, notify_type):
""" """
Perform Pushjet Notification Perform Pushjet Notification
@ -72,7 +75,7 @@ class NotifyPushjet(NotifyBase):
server += ":" + str(self.port) server += ":" + str(self.port)
api = pushjet.Api(server) api = pushjet.Api(server)
service = api.Service(secret_key=self.user) service = api.Service(secret_key=self.secret_key)
service.send(body, title) service.send(body, title)
self.logger.info('Sent Pushjet notification.') self.logger.info('Sent Pushjet notification.')
@ -84,6 +87,27 @@ class NotifyPushjet(NotifyBase):
return True return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
}
default_port = 443 if self.secure else 80
return '{schema}://{secret_key}@{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
secret_key=self.quote(self.secret_key, safe=''),
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """
@ -91,10 +115,10 @@ class NotifyPushjet(NotifyBase):
us to substantiate this object. us to substantiate this object.
Syntax: Syntax:
pjet://secret@hostname pjet://secret_key@hostname
pjet://secret@hostname:port pjet://secret_key@hostname:port
pjets://secret@hostname pjets://secret_key@hostname
pjets://secret@hostname:port pjets://secret_key@hostname:port
""" """
results = NotifyBase.parse_url(url) results = NotifyBase.parse_url(url)
@ -107,4 +131,7 @@ class NotifyPushjet(NotifyBase):
# a username is required # a username is required
return None return None
# Store it as it's value
results['secret_key'] = results.get('user')
return results return results

View File

@ -237,6 +237,41 @@ class NotifyPushover(NotifyBase):
return not has_error return not has_error
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
_map = {
PushoverPriority.LOW: 'low',
PushoverPriority.MODERATE: 'moderate',
PushoverPriority.NORMAL: 'normal',
PushoverPriority.HIGH: 'high',
PushoverPriority.EMERGENCY: 'emergency',
}
# Define any arguments set
args = {
'format': self.notify_format,
'priority':
_map[PushoverPriority.NORMAL] if self.priority not in _map
else _map[self.priority],
}
devices = '/'.join([self.quote(x) for x in self.devices])
if devices == PUSHOVER_SEND_TO_ALL:
# keyword is reserved for internal usage only; it's safe to remove
# it from the devices list
devices = ''
return '{schema}://{auth}{token}/{devices}/?{args}'.format(
schema=self.secure_protocol,
auth='' if not self.user
else '{user}@'.format(user=self.quote(self.user, safe='')),
token=self.quote(self.token, safe=''),
devices=devices,
args=self.urlencode(args))
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -26,6 +26,7 @@
import re import re
import requests import requests
from json import loads from json import loads
from itertools import chain
from .NotifyBase import NotifyBase from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP from .NotifyBase import HTTP_ERROR_MAP
@ -106,6 +107,12 @@ class NotifyRocketChat(NotifyBase):
elif not isinstance(recipients, (set, tuple, list)): elif not isinstance(recipients, (set, tuple, list)):
recipients = [] recipients = []
if not (self.user and self.password):
# Username & Password is required for Rocket Chat to work
raise TypeError(
'No Rocket.Chat user/pass combo specified.'
)
# Validate recipients and drop bad ones: # Validate recipients and drop bad ones:
for recipient in recipients: for recipient in recipients:
result = IS_CHANNEL.match(recipient) result = IS_CHANNEL.match(recipient)
@ -133,6 +140,40 @@ class NotifyRocketChat(NotifyBase):
# Used to track token headers upon authentication (if successful) # Used to track token headers upon authentication (if successful)
self.headers = {} self.headers = {}
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
}
# Determine Authentication
auth = '{user}:{password}@'.format(
user=self.quote(self.user, safe=''),
password=self.quote(self.password, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}/{targets}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
targets='/'.join(
[self.quote(x) for x in chain(
# Channels are prefixed with a pound/hashtag symbol
['#{}'.format(x) for x in self.channels],
# Rooms are as is
self.rooms,
)]),
args=self.urlencode(args),
)
def notify(self, title, body, notify_type, **kwargs): def notify(self, title, body, notify_type, **kwargs):
""" """
wrapper to send_notification since we can alert more then one channel wrapper to send_notification since we can alert more then one channel

View File

@ -221,6 +221,32 @@ class NotifyRyver(NotifyBase):
return True return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'webhook': self.webhook,
}
# Determine if there is a botname present
botname = ''
if self.user:
botname = '{botname}@'.format(
botname=self.quote(self.user, safe=''),
)
return '{schema}://{botname}{organization}/{token}/?{args}'.format(
schema=self.secure_protocol,
botname=botname,
organization=self.quote(self.organization, safe=''),
token=self.quote(self.token, safe=''),
args=self.urlencode(args),
)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -31,6 +31,7 @@ from hashlib import sha256
from datetime import datetime from datetime import datetime
from collections import OrderedDict from collections import OrderedDict
from xml.etree import ElementTree from xml.etree import ElementTree
from itertools import chain
from .NotifyBase import NotifyBase from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP from .NotifyBase import HTTP_ERROR_MAP
@ -58,8 +59,8 @@ LIST_DELIM = re.compile(r'[ \t\r\n,\\/]+')
# region as a delimiter. This is a bit hacky; but it's much easier than having # region as a delimiter. This is a bit hacky; but it's much easier than having
# users of this product search though this Access Key Secret and escape all # users of this product search though this Access Key Secret and escape all
# of the forward slashes! # of the forward slashes!
IS_REGION = re.compile(r'^\s*(?P<country>[a-z]{2})-' IS_REGION = re.compile(
r'(?P<area>[a-z]+)-(?P<no>[0-9]+)\s*$', re.I) r'^\s*(?P<country>[a-z]{2})-(?P<area>[a-z]+)-(?P<no>[0-9]+)\s*$', re.I)
# Extend HTTP Error Messages # Extend HTTP Error Messages
AWS_HTTP_ERROR_MAP = HTTP_ERROR_MAP.copy() AWS_HTTP_ERROR_MAP = HTTP_ERROR_MAP.copy()
@ -521,6 +522,32 @@ class NotifySNS(NotifyBase):
return response return response
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
}
return '{schema}://{key_id}/{key_secret}/{region}/{targets}/'\
'?{args}'.format(
schema=self.secure_protocol,
key_id=self.quote(self.aws_access_key_id, safe=''),
key_secret=self.quote(self.aws_secret_access_key, safe=''),
region=self.quote(self.aws_region_name, safe=''),
targets='/'.join(
[self.quote(x) for x in chain(
# Phone # are prefixed with a plus symbol
['+{}'.format(x) for x in self.phone],
# Topics are prefixed with a pound/hashtag symbol
['#{}'.format(x) for x in self.topics],
)]),
args=self.urlencode(args),
)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -141,7 +141,6 @@ class NotifySlack(NotifyBase):
if not self.user: if not self.user:
self.logger.warning( self.logger.warning(
'No user was specified; using %s.' % SLACK_DEFAULT_USER) 'No user was specified; using %s.' % SLACK_DEFAULT_USER)
self.user = SLACK_DEFAULT_USER
if compat_is_basestring(channels): if compat_is_basestring(channels):
self.channels = [x for x in filter(bool, CHANNEL_LIST_DELIM.split( self.channels = [x for x in filter(bool, CHANNEL_LIST_DELIM.split(
@ -231,7 +230,7 @@ class NotifySlack(NotifyBase):
# prepare JSON Object # prepare JSON Object
payload = { payload = {
'channel': _channel, 'channel': _channel,
'username': self.user, 'username': self.user if self.user else SLACK_DEFAULT_USER,
# Use Markdown language # Use Markdown language
'mrkdwn': True, 'mrkdwn': True,
'attachments': [{ 'attachments': [{
@ -297,6 +296,35 @@ class NotifySlack(NotifyBase):
return notify_okay return notify_okay
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
}
# Determine if there is a botname present
botname = ''
if self.user:
botname = '{botname}@'.format(
botname=self.quote(self.user, safe=''),
)
return '{schema}://{botname}{token_a}/{token_b}/{token_c}/{targets}/'\
'?{args}'.format(
schema=self.secure_protocol,
botname=botname,
token_a=self.quote(self.token_a, safe=''),
token_b=self.quote(self.token_b, safe=''),
token_c=self.quote(self.token_c, safe=''),
targets='/'.join(
[self.quote(x, safe='') for x in self.channels]),
args=self.urlencode(args),
)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -60,8 +60,8 @@ from json import dumps
from .NotifyBase import NotifyBase from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyImageSize from ..common import NotifyImageSize
from ..utils import compat_is_basestring
from ..utils import parse_bool from ..utils import parse_bool
from ..utils import parse_list
from ..common import NotifyFormat from ..common import NotifyFormat
TELEGRAM_IMAGE_XY = NotifyImageSize.XY_256 TELEGRAM_IMAGE_XY = NotifyImageSize.XY_256
@ -81,9 +81,6 @@ IS_CHAT_ID_RE = re.compile(
re.IGNORECASE, re.IGNORECASE,
) )
# Used to break path apart into list of chat identifiers
CHAT_ID_LIST_DELIM = re.compile(r'[ \t\r\n,#\\/]+')
class NotifyTelegram(NotifyBase): class NotifyTelegram(NotifyBase):
""" """
@ -134,16 +131,8 @@ class NotifyTelegram(NotifyBase):
# Store our Bot Token # Store our Bot Token
self.bot_token = result.group('key') self.bot_token = result.group('key')
if compat_is_basestring(chat_ids): # Parse our list
self.chat_ids = [x for x in filter(bool, CHAT_ID_LIST_DELIM.split( self.chat_ids = parse_list(chat_ids)
chat_ids,
))]
elif isinstance(chat_ids, (set, tuple, list)):
self.chat_ids = list(chat_ids)
else:
self.chat_ids = list()
if self.user: if self.user:
# Treat this as a channel too # Treat this as a channel too
@ -153,7 +142,7 @@ class NotifyTelegram(NotifyBase):
_id = self.detect_bot_owner() _id = self.detect_bot_owner()
if _id: if _id:
# Store our id # Store our id
self.chat_ids = [str(_id)] self.chat_ids.append(str(_id))
if len(self.chat_ids) == 0: if len(self.chat_ids) == 0:
self.logger.warning('No chat_id(s) were specified.') self.logger.warning('No chat_id(s) were specified.')
@ -501,6 +490,25 @@ class NotifyTelegram(NotifyBase):
return not has_error return not has_error
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
}
# No need to check the user token because the user automatically gets
# appended into the list of chat ids
return '{schema}://{bot_token}/{targets}/?{args}'.format(
schema=self.secure_protocol,
bot_token=self.quote(self.bot_token, safe=''),
targets='/'.join(
[self.quote('@{}'.format(x)) for x in self.chat_ids]),
args=self.urlencode(args))
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """
@ -512,17 +520,17 @@ class NotifyTelegram(NotifyBase):
# tgram:// messages since the bot_token has a colon in it. # tgram:// messages since the bot_token has a colon in it.
# It invalidates an normal URL. # It invalidates an normal URL.
# This hack searches for this bogus URL and corrects it # This hack searches for this bogus URL and corrects it so we can
# so we can properly load it further down. The other # properly load it further down. The other alternative is to ask users
# alternative is to ask users to actually change the colon # to actually change the colon into a slash (which will work too), but
# into a slash (which will work too), but it's more likely # it's more likely to cause confusion... So this is the next best thing
# to cause confusion... So this is the next best thing # we also check for %3A (incase the URL is encoded) as %3A == :
try: try:
tgram = re.match( tgram = re.match(
r'(?P<protocol>%s://)(bot)?(?P<prefix>([a-z0-9_-]+)' r'(?P<protocol>{schema}://)(bot)?(?P<prefix>([a-z0-9_-]+)'
r'(:[a-z0-9_-]+)?@)?(?P<btoken_a>[0-9]+):+' r'(:[a-z0-9_-]+)?@)?(?P<btoken_a>[0-9]+)(:|%3A)+'
r'(?P<remaining>.*)$' % NotifyTelegram.secure_protocol, r'(?P<remaining>.*)$'.format(
url, re.I) schema=NotifyTelegram.secure_protocol), url, re.I)
except (TypeError, AttributeError): except (TypeError, AttributeError):
# url is bad; force tgram to be None # url is bad; force tgram to be None
@ -534,14 +542,11 @@ class NotifyTelegram(NotifyBase):
if tgram.group('prefix'): if tgram.group('prefix'):
# Try again # Try again
results = NotifyBase.parse_url( results = NotifyBase.parse_url('%s%s%s/%s' % (
'%s%s%s/%s' % ( tgram.group('protocol'),
tgram.group('protocol'), tgram.group('prefix'),
tgram.group('prefix'), tgram.group('btoken_a'),
tgram.group('btoken_a'), tgram.group('remaining')))
tgram.group('remaining'),
),
)
else: else:
# Try again # Try again
@ -562,9 +567,8 @@ class NotifyTelegram(NotifyBase):
bot_token = '%s:%s' % (bot_token_a, bot_token_b) bot_token = '%s:%s' % (bot_token_a, bot_token_b)
chat_ids = ','.join( chat_ids = [x for x in filter(
[x for x in filter( bool, NotifyBase.split_path(results['fullpath']))][1:]
bool, NotifyBase.split_path(results['fullpath']))][1:])
# Store our bot token # Store our bot token
results['bot_token'] = bot_token results['bot_token'] = bot_token

View File

@ -109,7 +109,9 @@ class NotifyTwitter(NotifyBase):
) )
return False return False
text = '%s\r\n%s' % (title, body) # Only set title if it was specified
text = body if not title else '%s\r\n%s' % (title, body)
try: try:
# Get our API # Get our API
api = tweepy.API(self.auth) api = tweepy.API(self.auth)

View File

@ -175,6 +175,13 @@ class NotifyWindows(NotifyBase):
return True return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
return '{schema}://'.format(schema=self.protocol)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -44,11 +44,16 @@ class NotifyXBMC(NotifyBase):
# The services URL # The services URL
service_url = 'http://kodi.tv/' service_url = 'http://kodi.tv/'
xbmc_protocol = 'xbmc'
xbmc_secure_protocol = 'xbmcs'
kodi_protocol = 'kodi'
kodi_secure_protocol = 'kodis'
# The default protocols # The default protocols
protocol = ('xbmc', 'kodi') protocol = (xbmc_protocol, kodi_protocol)
# The default secure protocols # The default secure protocols
secure_protocol = ('xbmc', 'kodis') secure_protocol = (xbmc_secure_protocol, kodi_secure_protocol)
# A URL that takes you to the setup/help of the specific protocol # A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_kodi' setup_url = 'https://github.com/caronc/apprise/wiki/Notify_kodi'
@ -224,6 +229,44 @@ class NotifyXBMC(NotifyBase):
return True return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
}
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=self.quote(self.user, safe=''),
password=self.quote(self.password, safe=''),
)
elif self.user:
auth = '{user}@'.format(
user=self.quote(self.user, safe=''),
)
default_schema = self.xbmc_protocol if (
self.protocol <= self.xbmc_remote_protocol) else self.kodi_protocol
default_port = 443 if self.secure else self.xbmc_default_port
if self.secure:
# Append 's' to schema
default_schema + 's'
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
schema=default_schema,
auth=auth,
hostname=self.host,
port='' if not self.port or self.port == default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """

View File

@ -85,6 +85,39 @@ class NotifyXML(NotifyBase):
return return
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
}
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=self.quote(self.user, safe=''),
password=self.quote(self.password, safe=''),
)
elif self.user:
auth = '{user}@'.format(
user=self.quote(self.user, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
def notify(self, title, body, notify_type, **kwargs): def notify(self, title, body, notify_type, **kwargs):
""" """
Perform XML Notification Perform XML Notification

View File

@ -397,6 +397,10 @@ def parse_list(*args):
elif isinstance(arg, (set, list, tuple)): elif isinstance(arg, (set, list, tuple)):
result += parse_list(*arg) result += parse_list(*arg)
elif arg is None:
# Ignore
continue
else: else:
# Convert whatever it is to a string and work with it # Convert whatever it is to a string and work with it
result += parse_list(str(arg)) result += parse_list(str(arg))

View File

@ -71,15 +71,34 @@ def test_apprise():
a = Apprise(servers=servers) a = Apprise(servers=servers)
# 3 servers loaded # 2 servers loaded
assert(len(a) == 2) assert(len(a) == 2)
# We can retrieve our URLs this way:
assert(len(a.urls()) == 2)
# We can add another server # We can add another server
assert( assert(
a.add('mmosts://mattermost.server.local/' a.add('mmosts://mattermost.server.local/'
'3ccdd113474722377935511fc85d3dd4') is True) '3ccdd113474722377935511fc85d3dd4') is True)
assert(len(a) == 3) assert(len(a) == 3)
# We can pop an object off of our stack by it's indexed value:
obj = a.pop(0)
assert(isinstance(obj, NotifyBase) is True)
assert(len(a) == 2)
# We can retrieve elements from our list too by reference:
assert(compat_is_basestring(a[0].url()) is True)
# We can iterate over our list too:
count = 0
for o in a:
assert(compat_is_basestring(o.url()) is True)
count += 1
# verify that we did indeed iterate over each element
assert(len(a) == count)
# We can empty our set # We can empty our set
a.clear() a.clear()
assert(len(a) == 0) assert(len(a) == 0)

View File

@ -26,6 +26,7 @@
from apprise import plugins from apprise import plugins
from apprise import NotifyType from apprise import NotifyType
from apprise import Apprise from apprise import Apprise
from apprise.utils import compat_is_basestring
from apprise.plugins import NotifyEmailBase from apprise.plugins import NotifyEmailBase
import smtplib import smtplib
@ -49,7 +50,7 @@ TEST_URLS = (
# No Username # No Username
('mailtos://:pass@nuxref.com:567', { ('mailtos://:pass@nuxref.com:567', {
# Can't prepare a To address using this expression # Can't prepare a To address using this expression
'exception': TypeError, 'instance': TypeError,
}), }),
# Pre-Configured Email Services # Pre-Configured Email Services
@ -115,27 +116,27 @@ TEST_URLS = (
}), }),
# Invalid From Address # Invalid From Address
('mailtos://user:pass@nuxref.com?from=@', { ('mailtos://user:pass@nuxref.com?from=@', {
'exception': TypeError, 'instance': TypeError,
}), }),
# Invalid From Address # Invalid From Address
('mailtos://nuxref.com?user=&pass=.', { ('mailtos://nuxref.com?user=&pass=.', {
'exception': TypeError, 'instance': TypeError,
}), }),
# Invalid To Address # Invalid To Address
('mailtos://user:pass@nuxref.com?to=@', { ('mailtos://user:pass@nuxref.com?to=@', {
'exception': TypeError, 'instance': TypeError,
}), }),
# Valid URL, but can't structure a proper email # Valid URL, but can't structure a proper email
('mailtos://nuxref.com?user=%20!&pass=.', { ('mailtos://nuxref.com?user=%20!&pass=.', {
'exception': TypeError, 'instance': TypeError,
}), }),
# Invalid From (and To) Address # Invalid From (and To) Address
('mailtos://nuxref.com?to=test', { ('mailtos://nuxref.com?to=test', {
'exception': TypeError, 'instance': TypeError,
}), }),
# Invalid Secure Mode # Invalid Secure Mode
('mailtos://user:pass@example.com?mode=notamode', { ('mailtos://user:pass@example.com?mode=notamode', {
'exception': TypeError, 'instance': TypeError,
}), }),
# STARTTLS flag checking # STARTTLS flag checking
('mailtos://user:pass@gmail.com?mode=starttls', { ('mailtos://user:pass@gmail.com?mode=starttls', {
@ -172,9 +173,6 @@ def test_email_plugin(mock_smtp, mock_smtpssl):
# Our expected instance # Our expected instance
instance = meta.get('instance', None) instance = meta.get('instance', None)
# Our expected exception
exception = meta.get('exception', None)
# Our expected server objects # Our expected server objects
self = meta.get('self', None) self = meta.get('self', None)
@ -217,19 +215,37 @@ def test_email_plugin(mock_smtp, mock_smtpssl):
try: try:
obj = Apprise.instantiate(url, suppress_exceptions=False) obj = Apprise.instantiate(url, suppress_exceptions=False)
assert(exception is None)
if obj is None: if obj is None:
# We're done # We're done (assuming this is what we were expecting)
assert instance is None
continue continue
if instance is None: if instance is None:
# Expected None but didn't get it # Expected None but didn't get it
print('%s instantiated %s' % (url, str(obj))) print('%s instantiated %s (but expected None)' % (
url, str(obj)))
assert(False) assert(False)
assert(isinstance(obj, instance)) assert(isinstance(obj, instance))
if isinstance(obj, plugins.NotifyBase.NotifyBase):
# We loaded okay; now lets make sure we can reverse this url
assert(compat_is_basestring(obj.url()) is True)
# Instantiate the exact same object again using the URL from
# the one that was already created properly
obj_cmp = Apprise.instantiate(obj.url())
# Our object should be the same instance as what we had
# originally expected above.
if not isinstance(obj_cmp, plugins.NotifyBase.NotifyBase):
# Assert messages are hard to trace back with the way
# these tests work. Just printing before throwing our
# assertion failure makes things easier to debug later on
print('TEST FAIL: {} regenerated as {}'.format(
url, obj.url()))
assert(False)
if self: if self:
# Iterate over our expected entries inside of our object # Iterate over our expected entries inside of our object
for key, val in self.items(): for key, val in self.items():
@ -258,16 +274,17 @@ def test_email_plugin(mock_smtp, mock_smtpssl):
except Exception as e: except Exception as e:
# We can't handle this exception type # We can't handle this exception type
print('%s / %s' % (url, str(e))) raise
assert False
except AssertionError: except AssertionError:
# Don't mess with these entries # Don't mess with these entries
print('%s AssertionError' % url)
raise raise
except Exception as e: except Exception as e:
# Check that we were expecting this exception to happen # Check that we were expecting this exception to happen
assert isinstance(e, response) if not isinstance(e, response):
raise
except AssertionError: except AssertionError:
# Don't mess with these entries # Don't mess with these entries
@ -276,9 +293,11 @@ def test_email_plugin(mock_smtp, mock_smtpssl):
except Exception as e: except Exception as e:
# Handle our exception # Handle our exception
print('%s / %s' % (url, str(e))) if(instance is None):
assert(exception is not None) raise
assert(isinstance(e, exception))
if not isinstance(e, instance):
raise
@mock.patch('smtplib.SMTP') @mock.patch('smtplib.SMTP')

View File

@ -28,6 +28,7 @@ import mock
import sys import sys
import types import types
import apprise import apprise
from apprise.utils import compat_is_basestring
try: try:
# Python v3.4+ # Python v3.4+
@ -223,6 +224,9 @@ def test_dbus_plugin(mock_mainloop, mock_byte, mock_bytearray,
assert(isinstance(obj, apprise.plugins.NotifyDBus) is True) assert(isinstance(obj, apprise.plugins.NotifyDBus) is True)
obj.duration = 0 obj.duration = 0
# Test url() call
assert(compat_is_basestring(obj.url()) is True)
# Our notification succeeds even though the gi library was not loaded # Our notification succeeds even though the gi library was not loaded
assert(obj.notify(title='title', body='body', assert(obj.notify(title='title', body='body',
notify_type=apprise.NotifyType.INFO) is True) notify_type=apprise.NotifyType.INFO) is True)

View File

@ -28,6 +28,7 @@ import sys
import types import types
import apprise import apprise
from apprise.utils import compat_is_basestring
try: try:
# Python v3.4+ # Python v3.4+
@ -113,6 +114,9 @@ def test_gnome_plugin():
# Check that it found our mocked environments # Check that it found our mocked environments
assert(obj._enabled is True) assert(obj._enabled is True)
# Test url() call
assert(compat_is_basestring(obj.url()) is True)
# test notifications # test notifications
assert(obj.notify(title='title', body='body', assert(obj.notify(title='title', body='body',
notify_type=apprise.NotifyType.INFO) is True) notify_type=apprise.NotifyType.INFO) is True)

View File

@ -26,6 +26,8 @@
from apprise import plugins from apprise import plugins
from apprise import NotifyType from apprise import NotifyType
from apprise import Apprise from apprise import Apprise
from apprise.utils import compat_is_basestring
import mock import mock
import re import re
@ -195,7 +197,6 @@ def test_growl_plugin(mock_gntp):
except Exception as e: except Exception as e:
# We can't handle this exception type # We can't handle this exception type
print('%s / %s' % (url, str(e)))
assert False assert False
# We're done this part of the test # We're done this part of the test
@ -216,10 +217,27 @@ def test_growl_plugin(mock_gntp):
if instance is None: if instance is None:
# Expected None but didn't get it # Expected None but didn't get it
print('%s instantiated %s' % (url, str(obj)))
assert(False) assert(False)
assert(isinstance(obj, instance)) assert(isinstance(obj, instance) is True)
if isinstance(obj, plugins.NotifyBase.NotifyBase):
# We loaded okay; now lets make sure we can reverse this url
assert(compat_is_basestring(obj.url()) is True)
# Instantiate the exact same object again using the URL from
# the one that was already created properly
obj_cmp = Apprise.instantiate(obj.url())
# Our object should be the same instance as what we had
# originally expected above.
if not isinstance(obj_cmp, plugins.NotifyBase.NotifyBase):
# Assert messages are hard to trace back with the way
# these tests work. Just printing before throwing our
# assertion failure makes things easier to debug later on
print('TEST FAIL: {} regenerated as {}'.format(
url, obj.url()))
assert(False)
if self: if self:
# Iterate over our expected entries inside of our object # Iterate over our expected entries inside of our object

View File

@ -52,6 +52,16 @@ def test_notify_base():
nb = NotifyBase(port=10) nb = NotifyBase(port=10)
assert nb.port == 10 assert nb.port == 10
try:
nb.url()
assert False
except NotImplementedError:
# Each sub-module is that inherits this as a parent is required to
# over-ride this function. So direct calls to this throws a not
# implemented error intentionally
assert True
# Throttle overrides.. # Throttle overrides..
nb = NotifyBase() nb = NotifyBase()
nb.throttle_attempt = 0.0 nb.throttle_attempt = 0.0

View File

@ -26,6 +26,8 @@
from apprise import plugins from apprise import plugins
from apprise import NotifyType from apprise import NotifyType
from apprise import Apprise from apprise import Apprise
from apprise.utils import compat_is_basestring
import mock import mock
TEST_URLS = ( TEST_URLS = (
@ -104,13 +106,37 @@ def test_plugin(mock_refresh, mock_send):
try: try:
obj = Apprise.instantiate(url, suppress_exceptions=False) obj = Apprise.instantiate(url, suppress_exceptions=False)
if instance is None: if obj is None:
# Check that we got what we came for # We're done (assuming this is what we were expecting)
assert obj is instance assert instance is None
continue continue
if instance is None:
# Expected None but didn't get it
print('%s instantiated %s (but expected None)' % (
url, str(obj)))
assert(False)
assert(isinstance(obj, instance)) assert(isinstance(obj, instance))
if isinstance(obj, plugins.NotifyBase.NotifyBase):
# We loaded okay; now lets make sure we can reverse this url
assert(compat_is_basestring(obj.url()) is True)
# Instantiate the exact same object again using the URL from
# the one that was already created properly
obj_cmp = Apprise.instantiate(obj.url())
# Our object should be the same instance as what we had
# originally expected above.
if not isinstance(obj_cmp, plugins.NotifyBase.NotifyBase):
# Assert messages are hard to trace back with the way
# these tests work. Just printing before throwing our
# assertion failure makes things easier to debug later on
print('TEST FAIL: {} regenerated as {}'.format(
url, obj.url()))
assert(False)
if self: if self:
# Iterate over our expected entries inside of our object # Iterate over our expected entries inside of our object
for key, val in self.items(): for key, val in self.items():
@ -144,21 +170,27 @@ def test_plugin(mock_refresh, mock_send):
except Exception as e: except Exception as e:
# We can't handle this exception type # We can't handle this exception type
assert False raise
except AssertionError: except AssertionError:
# Don't mess with these entries # Don't mess with these entries
print('%s AssertionError' % url)
raise raise
except Exception as e: except Exception as e:
# Check that we were expecting this exception to happen # Check that we were expecting this exception to happen
assert isinstance(e, response) if not isinstance(e, response):
raise
except AssertionError: except AssertionError:
# Don't mess with these entries # Don't mess with these entries
print('%s AssertionError' % url)
raise raise
except Exception as e: except Exception as e:
# Handle our exception # Handle our exception
assert(instance is not None) if(instance is None):
assert(isinstance(e, instance)) raise
if not isinstance(e, instance):
raise

View File

@ -263,42 +263,13 @@ TEST_URLS = (
('ifttt://EventID/', { ('ifttt://EventID/', {
'instance': TypeError, 'instance': TypeError,
}), }),
# Value1 gets assigned Entry1
# Title = <assigned title>
# Body = <assigned body>
('ifttt://WebHookID@EventID/Entry1/', {
'instance': plugins.NotifyIFTTT,
}),
# Value1, Value2, and Value2, the below assigns:
# Value1 = Entry1
# Value2 = AnotherEntry
# Value3 = ThirdValue
# Title = <assigned title>
# Body = <assigned body>
('ifttt://WebHookID@EventID/Entry1/AnotherEntry/ThirdValue', {
'instance': plugins.NotifyIFTTT,
}),
# Mix and match content, the below assigns:
# Value1 = FirstValue
# AnotherKey = Hello
# Value5 = test
# Title = <assigned title>
# Body = <assigned body>
('ifttt://WebHookID@EventID/FirstValue/?AnotherKey=Hello&Value5=test', {
'instance': plugins.NotifyIFTTT,
}),
# This would assign:
# Value1 = FirstValue
# Title = <blank> - disable the one passed by the notify call
# Body = <blank> - disable the one passed by the notify call
# The idea here is maybe you just want to use the apprise IFTTTT hook
# to trigger something and not nessisarily pass text along to it
('ifttt://WebHookID@EventID/FirstValue/?Title=&Body=', {
'instance': plugins.NotifyIFTTT,
}),
('ifttt://:@/', { ('ifttt://:@/', {
'instance': None, 'instance': None,
}), }),
# A nicely formed ifttt url with 2 events defined:
('ifttt://WebHookID@EventID/EventID2/', {
'instance': plugins.NotifyIFTTT,
}),
# Test website connection failures # Test website connection failures
('ifttt://WebHookID@EventID', { ('ifttt://WebHookID@EventID', {
'instance': plugins.NotifyIFTTT, 'instance': plugins.NotifyIFTTT,
@ -392,6 +363,9 @@ TEST_URLS = (
('json://user:pass@localhost', { ('json://user:pass@localhost', {
'instance': plugins.NotifyJSON, 'instance': plugins.NotifyJSON,
}), }),
('json://user@localhost', {
'instance': plugins.NotifyJSON,
}),
('json://localhost:8080', { ('json://localhost:8080', {
'instance': plugins.NotifyJSON, 'instance': plugins.NotifyJSON,
}), }),
@ -529,8 +503,8 @@ TEST_URLS = (
('matrix://user@localhost/%s' % ('a' * 64), { ('matrix://user@localhost/%s' % ('a' * 64), {
'instance': plugins.NotifyMatrix, 'instance': plugins.NotifyMatrix,
}), }),
# Name, port and token (secure) # port and token (secure)
('matrixs://user@localhost:9000/%s' % ('a' * 64), { ('matrixs://localhost:9000/%s' % ('a' * 64), {
'instance': plugins.NotifyMatrix, 'instance': plugins.NotifyMatrix,
}), }),
# Name, port, token and slack mode # Name, port, token and slack mode
@ -963,6 +937,14 @@ TEST_URLS = (
('rocket://user:pass@localhost/#/!/@', { ('rocket://user:pass@localhost/#/!/@', {
'instance': TypeError, 'instance': TypeError,
}), }),
# No user/pass combo
('rocket://user@localhost/room/', {
'instance': TypeError,
}),
# No user/pass combo
('rocket://localhost/room/', {
'instance': TypeError,
}),
# A room and port identifier # A room and port identifier
('rocket://user:pass@localhost:8080/room/', { ('rocket://user:pass@localhost:8080/room/', {
'instance': plugins.NotifyRocketChat, 'instance': plugins.NotifyRocketChat,
@ -1383,7 +1365,7 @@ TEST_URLS = (
('xbmc://user:pass@localhost:8080', { ('xbmc://user:pass@localhost:8080', {
'instance': plugins.NotifyXBMC, 'instance': plugins.NotifyXBMC,
}), }),
('xbmc://localhost', { ('xbmc://user@localhost', {
'instance': plugins.NotifyXBMC, 'instance': plugins.NotifyXBMC,
# don't include an image by default # don't include an image by default
'include_image': False, 'include_image': False,
@ -1432,6 +1414,9 @@ TEST_URLS = (
('xml://localhost', { ('xml://localhost', {
'instance': plugins.NotifyXML, 'instance': plugins.NotifyXML,
}), }),
('xml://user@localhost', {
'instance': plugins.NotifyXML,
}),
('xml://user:pass@localhost', { ('xml://user:pass@localhost', {
'instance': plugins.NotifyXML, 'instance': plugins.NotifyXML,
}), }),
@ -1487,6 +1472,8 @@ def test_rest_plugins(mock_post, mock_get):
API: REST Based Plugins() API: REST Based Plugins()
""" """
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
# iterate over our dictionary and test it out # iterate over our dictionary and test it out
for (url, meta) in TEST_URLS: for (url, meta) in TEST_URLS:
@ -1567,10 +1554,31 @@ def test_rest_plugins(mock_post, mock_get):
assert instance is None assert instance is None
continue continue
if instance is None:
# Expected None but didn't get it
print('%s instantiated %s (but expected None)' % (
url, str(obj)))
assert(False)
assert(isinstance(obj, instance)) assert(isinstance(obj, instance))
# Disable throttling to speed up unit tests if isinstance(obj, plugins.NotifyBase.NotifyBase):
obj.throttle_attempt = 0 # We loaded okay; now lets make sure we can reverse this url
assert(compat_is_basestring(obj.url()) is True)
# Instantiate the exact same object again using the URL from
# the one that was already created properly
obj_cmp = Apprise.instantiate(obj.url())
# Our object should be the same instance as what we had
# originally expected above.
if not isinstance(obj_cmp, plugins.NotifyBase.NotifyBase):
# Assert messages are hard to trace back with the way
# these tests work. Just printing before throwing our
# assertion failure makes things easier to debug later on
print('TEST FAIL: {} regenerated as {}'.format(
url, obj.url()))
assert(False)
if self: if self:
# Iterate over our expected entries inside of our object # Iterate over our expected entries inside of our object
@ -1605,8 +1613,7 @@ def test_rest_plugins(mock_post, mock_get):
except Exception as e: except Exception as e:
# We can't handle this exception type # We can't handle this exception type
print('%s / %s' % (url, str(e))) raise
assert False
except AssertionError: except AssertionError:
# Don't mess with these entries # Don't mess with these entries
@ -1615,7 +1622,8 @@ def test_rest_plugins(mock_post, mock_get):
except Exception as e: except Exception as e:
# Check that we were expecting this exception to happen # Check that we were expecting this exception to happen
assert isinstance(e, response) if not isinstance(e, response):
raise
# #
# Stage 2: without title defined # Stage 2: without title defined
@ -1643,8 +1651,7 @@ def test_rest_plugins(mock_post, mock_get):
except Exception as e: except Exception as e:
# We can't handle this exception type # We can't handle this exception type
print('%s / %s' % (url, str(e))) raise
assert False
except AssertionError: except AssertionError:
# Don't mess with these entries # Don't mess with these entries
@ -1653,7 +1660,8 @@ def test_rest_plugins(mock_post, mock_get):
except Exception as e: except Exception as e:
# Check that we were expecting this exception to happen # Check that we were expecting this exception to happen
assert isinstance(e, response) if not isinstance(e, response):
raise
except AssertionError: except AssertionError:
# Don't mess with these entries # Don't mess with these entries
@ -1662,9 +1670,11 @@ def test_rest_plugins(mock_post, mock_get):
except Exception as e: except Exception as e:
# Handle our exception # Handle our exception
print('%s / %s' % (url, str(e))) if(instance is None):
assert(instance is not None) raise
assert(isinstance(e, instance))
if not isinstance(e, instance):
raise
@mock.patch('requests.get') @mock.patch('requests.get')
@ -1674,6 +1684,9 @@ def test_notify_boxcar_plugin(mock_post, mock_get):
API: NotifyBoxcar() Extra Checks API: NotifyBoxcar() Extra Checks
""" """
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
# Generate some generic message types # Generate some generic message types
device = 'A' * 64 device = 'A' * 64
tag = '@B' * 63 tag = '@B' * 63
@ -1724,9 +1737,6 @@ def test_notify_boxcar_plugin(mock_post, mock_get):
# Test notifications without a body or a title # Test notifications without a body or a title
p = plugins.NotifyBoxcar(access=access, secret=secret, recipients=None) p = plugins.NotifyBoxcar(access=access, secret=secret, recipients=None)
# Disable throttling to speed up unit tests
p.throttle_attempt = 0
p.notify(body=None, title=None, notify_type=NotifyType.INFO) is True p.notify(body=None, title=None, notify_type=NotifyType.INFO) is True
@ -1737,6 +1747,8 @@ def test_notify_discord_plugin(mock_post, mock_get):
API: NotifyDiscord() Extra Checks API: NotifyDiscord() Extra Checks
""" """
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
# Initialize some generic (but valid) tokens # Initialize some generic (but valid) tokens
webhook_id = 'A' * 24 webhook_id = 'A' * 24
@ -1762,9 +1774,6 @@ def test_notify_discord_plugin(mock_post, mock_get):
webhook_token=webhook_token, webhook_token=webhook_token,
footer=True, thumbnail=False) footer=True, thumbnail=False)
# Disable throttling to speed up unit tests
obj.throttle_attempt = 0
# This call includes an image with it's payload: # This call includes an image with it's payload:
assert obj.notify(title='title', body='body', assert obj.notify(title='title', body='body',
notify_type=NotifyType.INFO) is True notify_type=NotifyType.INFO) is True
@ -2190,10 +2199,12 @@ def test_notify_ifttt_plugin(mock_post, mock_get):
API: NotifyIFTTT() Extra Checks API: NotifyIFTTT() Extra Checks
""" """
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
# Initialize some generic (but valid) tokens # Initialize some generic (but valid) tokens
apikey = 'webhookid' webhook_id = 'webhookid'
event = 'event' events = ['event1', 'event2']
# Prepare Mock # Prepare Mock
mock_get.return_value = requests.Request() mock_get.return_value = requests.Request()
@ -2204,7 +2215,7 @@ def test_notify_ifttt_plugin(mock_post, mock_get):
mock_post.return_value.content = '{}' mock_post.return_value.content = '{}'
try: try:
obj = plugins.NotifyIFTTT(apikey=apikey, event=None, event_args=None) obj = plugins.NotifyIFTTT(webhook_id=webhook_id, events=None)
# No token specified # No token specified
assert(False) assert(False)
@ -2212,12 +2223,8 @@ def test_notify_ifttt_plugin(mock_post, mock_get):
# Exception should be thrown about the fact no token was specified # Exception should be thrown about the fact no token was specified
assert(True) assert(True)
obj = plugins.NotifyIFTTT(apikey=apikey, event=event, event_args=None) obj = plugins.NotifyIFTTT(webhook_id=webhook_id, events=events)
assert(isinstance(obj, plugins.NotifyIFTTT)) assert(isinstance(obj, plugins.NotifyIFTTT))
assert(len(obj.event_args) == 0)
# Disable throttling to speed up unit tests
obj.throttle_attempt = 0
assert obj.notify(title='title', body='body', assert obj.notify(title='title', body='body',
notify_type=NotifyType.INFO) is True notify_type=NotifyType.INFO) is True
@ -2230,6 +2237,9 @@ def test_notify_join_plugin(mock_post, mock_get):
API: NotifyJoin() Extra Checks API: NotifyJoin() Extra Checks
""" """
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
# Generate some generic message types # Generate some generic message types
device = 'A' * 32 device = 'A' * 32
group = 'group.chrome' group = 'group.chrome'
@ -2250,9 +2260,6 @@ def test_notify_join_plugin(mock_post, mock_get):
mock_post.return_value.status_code = requests.codes.created mock_post.return_value.status_code = requests.codes.created
mock_get.return_value.status_code = requests.codes.created mock_get.return_value.status_code = requests.codes.created
# Disable throttling to speed up unit tests
p.throttle_attempt = 0
# Test notifications without a body or a title; nothing to send # Test notifications without a body or a title; nothing to send
# so we return False # so we return False
p.notify(body=None, title=None, notify_type=NotifyType.INFO) is False p.notify(body=None, title=None, notify_type=NotifyType.INFO) is False
@ -2265,6 +2272,8 @@ def test_notify_slack_plugin(mock_post, mock_get):
API: NotifySlack() Extra Checks API: NotifySlack() Extra Checks
""" """
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
# Initialize some generic (but valid) tokens # Initialize some generic (but valid) tokens
token_a = 'A' * 9 token_a = 'A' * 9
@ -2300,9 +2309,6 @@ def test_notify_slack_plugin(mock_post, mock_get):
token_a=token_a, token_b=token_b, token_c=token_c, channels=channels, token_a=token_a, token_b=token_b, token_c=token_c, channels=channels,
include_image=True) include_image=True)
# Disable throttling to speed up unit tests
obj.throttle_attempt = 0
# This call includes an image with it's payload: # This call includes an image with it's payload:
assert obj.notify(title='title', body='body', assert obj.notify(title='title', body='body',
notify_type=NotifyType.INFO) is True notify_type=NotifyType.INFO) is True
@ -2358,6 +2364,9 @@ def test_notify_pushed_plugin(mock_post, mock_get):
API: NotifyPushed() Extra Checks API: NotifyPushed() Extra Checks
""" """
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
# Chat ID # Chat ID
recipients = '@ABCDEFG, @DEFGHIJ, #channel, #channel2' recipients = '@ABCDEFG, @DEFGHIJ, #channel, #channel2'
@ -2436,9 +2445,6 @@ def test_notify_pushed_plugin(mock_post, mock_get):
assert(len(obj.channels) == 2) assert(len(obj.channels) == 2)
assert(len(obj.users) == 2) assert(len(obj.users) == 2)
# Disable throttling to speed up unit tests
obj.throttle_attempt = 0
# Support the handling of an empty and invalid URL strings # Support the handling of an empty and invalid URL strings
assert plugins.NotifyPushed.parse_url(None) is None assert plugins.NotifyPushed.parse_url(None) is None
assert plugins.NotifyPushed.parse_url('') is None assert plugins.NotifyPushed.parse_url('') is None
@ -2458,6 +2464,8 @@ def test_notify_pushover_plugin(mock_post, mock_get):
API: NotifyPushover() Extra Checks API: NotifyPushover() Extra Checks
""" """
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
# Initialize some generic (but valid) tokens # Initialize some generic (but valid) tokens
token = 'a' * 30 token = 'a' * 30
@ -2487,9 +2495,6 @@ def test_notify_pushover_plugin(mock_post, mock_get):
assert(isinstance(obj, plugins.NotifyPushover)) assert(isinstance(obj, plugins.NotifyPushover))
assert(len(obj.devices) == 3) assert(len(obj.devices) == 3)
# Disable throttling to speed up unit tests
obj.throttle_attempt = 0
# This call fails because there is 1 invalid device # This call fails because there is 1 invalid device
assert obj.notify(title='title', body='body', assert obj.notify(title='title', body='body',
notify_type=NotifyType.INFO) is False notify_type=NotifyType.INFO) is False
@ -2500,9 +2505,6 @@ def test_notify_pushover_plugin(mock_post, mock_get):
# device defined here # device defined here
assert(len(obj.devices) == 1) assert(len(obj.devices) == 1)
# Disable throttling to speed up unit tests
obj.throttle_attempt = 0
# This call succeeds because all of the devices are valid # This call succeeds because all of the devices are valid
assert obj.notify(title='title', body='body', assert obj.notify(title='title', body='body',
notify_type=NotifyType.INFO) is True notify_type=NotifyType.INFO) is True
@ -2526,9 +2528,16 @@ def test_notify_rocketchat_plugin(mock_post, mock_get):
API: NotifyRocketChat() Extra Checks API: NotifyRocketChat() Extra Checks
""" """
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
# Chat ID # Chat ID
recipients = 'l2g, lead2gold, #channel, #channel2' recipients = 'l2g, lead2gold, #channel, #channel2'
# Authentication
user = 'myuser'
password = 'mypass'
# Prepare Mock # Prepare Mock
mock_get.return_value = requests.Request() mock_get.return_value = requests.Request()
mock_post.return_value = requests.Request() mock_post.return_value = requests.Request()
@ -2538,7 +2547,8 @@ def test_notify_rocketchat_plugin(mock_post, mock_get):
mock_get.return_value.text = '' mock_get.return_value.text = ''
try: try:
obj = plugins.NotifyRocketChat(recipients=None) obj = plugins.NotifyRocketChat(
user=user, password=password, recipients=None)
# invalid recipients list (None) # invalid recipients list (None)
assert(False) assert(False)
@ -2548,7 +2558,8 @@ def test_notify_rocketchat_plugin(mock_post, mock_get):
assert(True) assert(True)
try: try:
obj = plugins.NotifyRocketChat(recipients=object()) obj = plugins.NotifyRocketChat(
user=user, password=password, recipients=object())
# invalid recipients list (object) # invalid recipients list (object)
assert(False) assert(False)
@ -2558,7 +2569,8 @@ def test_notify_rocketchat_plugin(mock_post, mock_get):
assert(True) assert(True)
try: try:
obj = plugins.NotifyRocketChat(recipients=set()) obj = plugins.NotifyRocketChat(
user=user, password=password, recipients=set())
# invalid recipient list/set (no entries) # invalid recipient list/set (no entries)
assert(False) assert(False)
@ -2567,14 +2579,12 @@ def test_notify_rocketchat_plugin(mock_post, mock_get):
# specified # specified
assert(True) assert(True)
obj = plugins.NotifyRocketChat(recipients=recipients) obj = plugins.NotifyRocketChat(
user=user, password=password, recipients=recipients)
assert(isinstance(obj, plugins.NotifyRocketChat)) assert(isinstance(obj, plugins.NotifyRocketChat))
assert(len(obj.channels) == 2) assert(len(obj.channels) == 2)
assert(len(obj.rooms) == 2) assert(len(obj.rooms) == 2)
# Disable throttling to speed up unit tests
obj.throttle_attempt = 0
# #
# Logout # Logout
# #
@ -2653,6 +2663,9 @@ def test_notify_telegram_plugin(mock_post, mock_get):
API: NotifyTelegram() Extra Checks API: NotifyTelegram() Extra Checks
""" """
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
# Bot Token # Bot Token
bot_token = '123456789:abcdefg_hijklmnop' bot_token = '123456789:abcdefg_hijklmnop'
invalid_bot_token = 'abcd:123' invalid_bot_token = 'abcd:123'
@ -2710,6 +2723,12 @@ def test_notify_telegram_plugin(mock_post, mock_get):
assert(isinstance(obj, plugins.NotifyTelegram)) assert(isinstance(obj, plugins.NotifyTelegram))
assert(len(obj.chat_ids) == 2) assert(len(obj.chat_ids) == 2)
# test url call
assert(compat_is_basestring(obj.url()))
# Test that we can load the string we generate back:
obj = plugins.NotifyTelegram(**plugins.NotifyTelegram.parse_url(obj.url()))
assert(isinstance(obj, plugins.NotifyTelegram))
# Support the handling of an empty and invalid URL strings # Support the handling of an empty and invalid URL strings
assert(plugins.NotifyTelegram.parse_url(None) is None) assert(plugins.NotifyTelegram.parse_url(None) is None)
assert(plugins.NotifyTelegram.parse_url('') is None) assert(plugins.NotifyTelegram.parse_url('') is None)
@ -2730,10 +2749,6 @@ def test_notify_telegram_plugin(mock_post, mock_get):
nimg_obj = plugins.NotifyTelegram(bot_token=bot_token, chat_ids=chat_ids) nimg_obj = plugins.NotifyTelegram(bot_token=bot_token, chat_ids=chat_ids)
nimg_obj.asset = AppriseAsset(image_path_mask=False, image_url_mask=False) nimg_obj.asset = AppriseAsset(image_path_mask=False, image_url_mask=False)
# Disable throttling to speed up unit tests
nimg_obj.throttle_attempt = 0
obj.throttle_attempt = 0
# Test that our default settings over-ride base settings since they are # Test that our default settings over-ride base settings since they are
# not the same as the one specified in the base; this check merely # not the same as the one specified in the base; this check merely
# ensures our plugin inheritance is working properly # ensures our plugin inheritance is working properly

View File

@ -29,6 +29,7 @@ import types
# Rebuild our Apprise environment # Rebuild our Apprise environment
import apprise import apprise
from apprise.utils import compat_is_basestring
try: try:
# Python v3.4+ # Python v3.4+
@ -107,6 +108,9 @@ def test_windows_plugin():
obj = apprise.Apprise.instantiate('windows://', suppress_exceptions=False) obj = apprise.Apprise.instantiate('windows://', suppress_exceptions=False)
obj.duration = 0 obj.duration = 0
# Test URL functionality
assert(compat_is_basestring(obj.url()) is True)
# Check that it found our mocked environments # Check that it found our mocked environments
assert(obj._enabled is True) assert(obj._enabled is True)