Improved URL parsing; introducing IPV6 support (#269)

pull/270/head
Chris Caron 2020-08-08 09:08:49 -04:00 committed by GitHub
parent bc44bdca84
commit ad6316bda0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
65 changed files with 395 additions and 380 deletions

View File

@ -310,7 +310,7 @@ class AttachHTTP(AttachBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = AttachBase.parse_url(url)

View File

@ -252,7 +252,7 @@ class ConfigHTTP(ConfigBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = ConfigBase.parse_url(url)

View File

@ -346,7 +346,6 @@ class NotifyBoxcar(NotifyBase):
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early
return None

View File

@ -299,7 +299,7 @@ class NotifyClickSend(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)

View File

@ -422,11 +422,10 @@ class NotifyD7Networks(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -29,7 +29,6 @@ from __future__ import print_function
from .NotifyBase import NotifyBase
from ..common import NotifyImageSize
from ..common import NotifyType
from ..utils import GET_SCHEMA_RE
from ..utils import parse_bool
from ..AppriseLocale import gettext_lazy as _
@ -141,7 +140,6 @@ class NotifyDBus(NotifyBase):
# object if we were to reference, we wouldn't be backwards compatible with
# Python v2. So converting the result set back into a list makes us
# compatible
protocol = list(MAINLOOP_MAP.keys())
# A URL that takes you to the setup/help of the specific protocol
@ -153,7 +151,7 @@ class NotifyDBus(NotifyBase):
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_128
# The number of seconds to keep the message present for
# The number of milliseconds to keep the message present for
message_timeout_ms = 13000
# Limit results to just the first 10 line otherwise there is just to much
@ -171,7 +169,7 @@ class NotifyDBus(NotifyBase):
# Define object templates
templates = (
'{schema}://_/',
'{schema}://',
)
# Define our template arguments
@ -386,24 +384,8 @@ class NotifyDBus(NotifyBase):
is in place.
"""
schema = GET_SCHEMA_RE.match(url)
if schema is None:
# Content is simply not parseable
return None
results = NotifyBase.parse_url(url)
if not results:
results = {
'schema': schema.group('schema').lower(),
'user': None,
'password': None,
'port': None,
'host': '_',
'fullpath': None,
'path': None,
'url': url,
'qsd': {},
}
results = NotifyBase.parse_url(url, verify_host=False)
# Include images with our message
results['include_image'] = \

View File

@ -429,14 +429,13 @@ class NotifyDiscord(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
Syntax:
discord://webhook_id/webhook_token
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -737,7 +737,8 @@ class NotifyEmail(NotifyBase):
return '{schema}://{auth}{hostname}{port}/{targets}?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=NotifyEmail.quote(self.host, safe=''),
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
targets='' if not has_targets else '/'.join(
@ -749,11 +750,10 @@ class NotifyEmail(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -61,9 +61,6 @@ class NotifyEmby(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_emby'
# Emby uses the http protocol with JSON requests
emby_default_port = 8096
# By default Emby requires you to provide it a device id
# The following was just a random uuid4 generated one. There
# is no real reason to change this, but hey; that's what open
@ -94,6 +91,7 @@ class NotifyEmby(NotifyBase):
'type': 'int',
'min': 1,
'max': 65535,
'default': 8096
},
'user': {
'name': _('Username'),
@ -137,6 +135,10 @@ class NotifyEmby(NotifyBase):
# or a modal type box (requires an Okay acknowledgement)
self.modal = modal
if not self.port:
# Assign default port if one isn't otherwise specified:
self.port = self.template_tokens['port']['default']
if not self.user:
# User was not specified
msg = 'No Emby username was specified.'
@ -620,8 +622,9 @@ class NotifyEmby(NotifyBase):
return '{schema}://{auth}{hostname}{port}/?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=NotifyEmby.quote(self.host, safe=''),
port='' if self.port is None or self.port == self.emby_default_port
hostname=self.host,
port='' if self.port is None
or self.port == self.template_tokens['port']['default']
else ':{}'.format(self.port),
params=NotifyEmby.urlencode(params),
)
@ -659,7 +662,7 @@ class NotifyEmby(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
@ -667,10 +670,6 @@ class NotifyEmby(NotifyBase):
# We're done early
return results
# Assign Default Emby Port
if not results['port']:
results['port'] = NotifyEmby.emby_default_port
# Modal type popup (default False)
results['modal'] = parse_bool(results['qsd'].get('modal', False))

View File

@ -213,7 +213,8 @@ class NotifyEnigma2(NotifyBase):
return '{schema}://{auth}{hostname}{port}{fullpath}?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=NotifyEnigma2.quote(self.host, safe=''),
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
fullpath=NotifyEnigma2.quote(self.fullpath, safe='/'),
@ -327,11 +328,10 @@ class NotifyEnigma2(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -29,6 +29,7 @@ from ..common import NotifyImageSize
from ..common import NotifyType
from ..utils import parse_bool
from ..AppriseLocale import gettext_lazy as _
from ..utils import validate_regex
class NotifyFaast(NotifyBase):
@ -86,7 +87,12 @@ class NotifyFaast(NotifyBase):
super(NotifyFaast, self).__init__(**kwargs)
# Store the Authentication Token
self.authtoken = authtoken
self.authtoken = validate_regex(authtoken)
if not self.authtoken:
msg = 'An invalid Faast Authentication Token ' \
'({}) was specified.'.format(authtoken)
self.logger.warning(msg)
raise TypeError(msg)
# Associate an image with our post
self.include_image = include_image
@ -187,11 +193,10 @@ class NotifyFaast(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -331,10 +331,9 @@ class NotifyFlock(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -386,11 +386,10 @@ class NotifyGitter(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -113,7 +113,7 @@ class NotifyGnome(NotifyBase):
# Define object templates
templates = (
'{schema}://_/',
'{schema}://',
)
# Define our template arguments
@ -224,7 +224,7 @@ class NotifyGnome(NotifyBase):
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
return '{schema}://_/?{params}'.format(
return '{schema}://?{params}'.format(
schema=self.protocol,
params=NotifyGnome.urlencode(params),
)
@ -238,19 +238,7 @@ class NotifyGnome(NotifyBase):
"""
results = NotifyBase.parse_url(url)
if not results:
results = {
'schema': NotifyGnome.protocol,
'user': None,
'password': None,
'port': None,
'host': '_',
'fullpath': None,
'path': None,
'url': url,
'qsd': {},
}
results = NotifyBase.parse_url(url, verify_host=False)
# Include images with our message
results['include_image'] = \

View File

@ -255,7 +255,8 @@ class NotifyGotify(NotifyBase):
return '{schema}://{hostname}{port}{fullpath}{token}/?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
hostname=NotifyGotify.quote(self.host, safe=''),
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
fullpath=NotifyGotify.quote(self.fullpath, safe='/'),
@ -267,7 +268,7 @@ class NotifyGotify(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)

View File

@ -359,7 +359,8 @@ class NotifyGrowl(NotifyBase):
return '{schema}://{auth}{hostname}{port}/?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=NotifyGrowl.quote(self.host, safe=''),
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None or self.port == self.default_port
else ':{}'.format(self.port),
params=NotifyGrowl.urlencode(params),
@ -369,11 +370,10 @@ class NotifyGrowl(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -310,11 +310,10 @@ class NotifyIFTTT(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -152,7 +152,8 @@ class NotifyJSON(NotifyBase):
return '{schema}://{auth}{hostname}{port}{fullpath}/?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=NotifyJSON.quote(self.host, safe=''),
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
fullpath=NotifyJSON.quote(self.fullpath, safe='/'),
@ -248,11 +249,10 @@ class NotifyJSON(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -354,11 +354,10 @@ class NotifyJoin(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -341,11 +341,10 @@ class NotifyKavenegar(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -214,11 +214,10 @@ class NotifyKumulos(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -298,9 +298,6 @@ class NotifyLametric(NotifyBase):
# URL used for local notifications directly to the device
device_notify_url = '{schema}://{host}{port}/api/v2/device/notifications'
# LaMetric Default port
default_device_port = 8080
# The Device User ID
default_device_user = 'dev'
@ -350,6 +347,7 @@ class NotifyLametric(NotifyBase):
'type': 'int',
'min': 1,
'max': 65535,
'default': 8080,
},
'user': {
'name': _('Username'),
@ -609,7 +607,8 @@ class NotifyLametric(NotifyBase):
schema="https" if self.secure else "http",
host=self.host,
port=':{}'.format(
self.port if self.port else self.default_device_port))
self.port if self.port
else self.template_tokens['port']['default']))
# Return request parameters
return (notify_url, auth, payload)
@ -746,7 +745,7 @@ class NotifyLametric(NotifyBase):
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None
or self.port == self.default_device_port
or self.port == self.template_tokens['port']['default']
else ':{}'.format(self.port),
params=NotifyLametric.urlencode(params),
)
@ -755,7 +754,7 @@ class NotifyLametric(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""

View File

@ -339,12 +339,11 @@ class NotifyMSG91(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -299,11 +299,10 @@ class NotifyMSTeams(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -69,7 +69,7 @@ class NotifyMacOSX(NotifyBase):
# Define object templates
templates = (
'{schema}://_/',
'{schema}://',
)
# Define our template arguments
@ -206,19 +206,7 @@ class NotifyMacOSX(NotifyBase):
"""
results = NotifyBase.parse_url(url)
if not results:
results = {
'schema': NotifyMacOSX.protocol,
'user': None,
'password': None,
'port': None,
'host': '_',
'fullpath': None,
'path': None,
'url': url,
'qsd': {},
}
results = NotifyBase.parse_url(url, verify_host=False)
# Include images with our message
results['include_image'] = \

View File

@ -340,11 +340,10 @@ class NotifyMailgun(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -1050,7 +1050,7 @@ class NotifyMatrix(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)

View File

@ -307,7 +307,9 @@ class NotifyMatterMost(NotifyBase):
'/?{params}'.format(
schema=default_schema,
botname=botname,
hostname=NotifyMatterMost.quote(self.host, safe=''),
# never encode hostname since we're expecting it to be a valid
# one
hostname=self.host,
port='' if not self.port or self.port == default_port
else ':{}'.format(self.port),
fullpath='/' if not self.fullpath else '{}/'.format(
@ -320,11 +322,10 @@ class NotifyMatterMost(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -329,12 +329,11 @@ class NotifyMessageBird(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
@ -349,7 +348,7 @@ class NotifyMessageBird(NotifyBase):
except IndexError:
# No path specified... this URL is potentially un-parseable; we can
# hope for a from= entry
pass
results['source'] = None
# The hostname is our authentication key
results['apikey'] = NotifyMessageBird.unquote(results['host'])

View File

@ -347,11 +347,10 @@ class NotifyNexmo(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -253,7 +253,9 @@ class NotifyNextcloud(NotifyBase):
schema=self.secure_protocol
if self.secure else self.protocol,
auth=auth,
hostname=NotifyNextcloud.quote(self.host, safe=''),
# never encode hostname since we're expecting it to be a
# valid one
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
targets='/'.join([NotifyNextcloud.quote(x)
@ -265,7 +267,7 @@ class NotifyNextcloud(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""

View File

@ -331,7 +331,7 @@ class NotifyNotica(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)

View File

@ -325,11 +325,11 @@ class NotifyNotifico(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -472,7 +472,7 @@ class NotifyOffice365(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""

View File

@ -274,12 +274,11 @@ class NotifyPopcornNotify(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -257,11 +257,10 @@ class NotifyProwl(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -395,11 +395,10 @@ class NotifyPushBullet(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -793,10 +793,10 @@ class NotifyPushSafer(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -326,11 +326,10 @@ class NotifyPushed(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -140,7 +140,8 @@ class NotifyPushjet(NotifyBase):
return '{schema}://{auth}{hostname}{port}/{secret}/?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=NotifyPushjet.quote(self.host, safe=''),
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
secret=self.pprint(
@ -232,7 +233,7 @@ class NotifyPushjet(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
Syntax:
pjet://hostname/secret_key
@ -252,7 +253,6 @@ class NotifyPushjet(NotifyBase):
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -532,11 +532,10 @@ class NotifyPushover(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -313,7 +313,8 @@ class NotifyRocketChat(NotifyBase):
return '{schema}://{auth}{hostname}{port}/{targets}/?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=NotifyRocketChat.quote(self.host, safe=''),
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
targets='/'.join(
@ -636,7 +637,7 @@ class NotifyRocketChat(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
@ -668,7 +669,6 @@ class NotifyRocketChat(NotifyBase):
)
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -302,12 +302,11 @@ class NotifyRyver(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -605,11 +605,10 @@ class NotifySNS(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -272,7 +272,8 @@ class NotifySendGrid(NotifyBase):
return '{schema}://{apikey}:{from_email}/{targets}?{params}'.format(
schema=self.secure_protocol,
apikey=self.pprint(self.apikey, privacy, safe=''),
from_email=self.quote(self.from_email, safe='@'),
# never encode email since it plays a huge role in our hostname
from_email=self.from_email,
targets='' if not has_targets else '/'.join(
[NotifySendGrid.quote(x, safe='') for x in self.targets]),
params=NotifySendGrid.urlencode(params),
@ -401,7 +402,7 @@ class NotifySendGrid(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""

View File

@ -314,10 +314,10 @@ class NotifySimplePush(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -422,7 +422,7 @@ class NotifySinch(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)

View File

@ -692,7 +692,7 @@ class NotifySlack(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)

View File

@ -347,7 +347,7 @@ class NotifySpontit(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""

View File

@ -254,11 +254,12 @@ class NotifySyslog(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# if specified; save hostname into facility

View File

@ -199,11 +199,10 @@ class NotifyTechulusPush(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -688,7 +688,7 @@ class NotifyTelegram(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
# This is a dirty hack; but it's the only work around to tgram://
@ -721,17 +721,14 @@ class NotifyTelegram(NotifyBase):
tgram.group('protocol'),
tgram.group('prefix'),
tgram.group('btoken_a'),
tgram.group('remaining')))
tgram.group('remaining')), verify_host=False)
else:
# Try again
results = NotifyBase.parse_url(
'%s%s/%s' % (
tgram.group('protocol'),
tgram.group('btoken_a'),
tgram.group('remaining'),
),
)
results = NotifyBase.parse_url('%s%s/%s' % (
tgram.group('protocol'),
tgram.group('btoken_a'),
tgram.group('remaining')), verify_host=False)
# The first token is stored in the hostname
bot_token_a = NotifyTelegram.unquote(results['host'])

View File

@ -385,7 +385,7 @@ class NotifyTwilio(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)

View File

@ -726,11 +726,10 @@ class NotifyTwist(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -609,11 +609,10 @@ class NotifyTwitter(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -222,11 +222,10 @@ class NotifyWebexTeams(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -91,7 +91,7 @@ class NotifyWindows(NotifyBase):
# Define object templates
templates = (
'{schema}://_/',
'{schema}://',
)
# Define our template arguments
@ -232,7 +232,7 @@ class NotifyWindows(NotifyBase):
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
return '{schema}://_/?{params}'.format(
return '{schema}://?{params}'.format(
schema=self.protocol,
params=NotifyWindows.urlencode(params),
)
@ -246,19 +246,7 @@ class NotifyWindows(NotifyBase):
"""
results = NotifyBase.parse_url(url)
if not results:
results = {
'schema': NotifyWindows.protocol,
'user': None,
'password': None,
'port': None,
'host': '_',
'fullpath': None,
'path': None,
'url': url,
'qsd': {},
}
results = NotifyBase.parse_url(url, verify_host=False)
# Include images with our message
results['include_image'] = \

View File

@ -73,9 +73,6 @@ class NotifyXBMC(NotifyBase):
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_128
# The number of seconds to display the popup for
default_popup_duration_sec = 12
# XBMC default protocol version (v2)
xbmc_remote_protocol = 2
@ -137,8 +134,9 @@ class NotifyXBMC(NotifyBase):
super(NotifyXBMC, self).__init__(**kwargs)
# Number of seconds to display notification for
self.duration = self.default_popup_duration_sec \
if not (isinstance(duration, int) and duration > 0) else duration
self.duration = self.template_args['duration']['default'] \
if not (isinstance(duration, int) and
self.template_args['duration']['min'] > 0) else duration
# Build our schema
self.schema = 'https' if self.secure else 'http'
@ -335,7 +333,8 @@ class NotifyXBMC(NotifyBase):
return '{schema}://{auth}{hostname}{port}/?{params}'.format(
schema=default_schema,
auth=auth,
hostname=NotifyXBMC.quote(self.host, safe=''),
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if not self.port or self.port == default_port
else ':{}'.format(self.port),
params=NotifyXBMC.urlencode(params),
@ -345,7 +344,7 @@ class NotifyXBMC(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)

View File

@ -167,7 +167,8 @@ class NotifyXML(NotifyBase):
return '{schema}://{auth}{hostname}{port}{fullpath}/?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=NotifyXML.quote(self.host, safe=''),
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
fullpath=NotifyXML.quote(self.fullpath, safe='/'),
@ -267,11 +268,10 @@ class NotifyXML(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -306,7 +306,8 @@ class NotifyXMPP(NotifyBase):
return '{schema}://{auth}@{hostname}{port}/{jids}?{params}'.format(
auth=auth,
schema=default_schema,
hostname=NotifyXMPP.quote(self.host, safe=''),
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if not self.port or self.port == default_port
else ':{}'.format(self.port),
jids=jids,
@ -317,11 +318,10 @@ class NotifyXMPP(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -352,11 +352,10 @@ class NotifyZulip(NotifyBase):
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results

View File

@ -127,52 +127,92 @@ URL_DETECTION_RE = re.compile(
REGEX_VALIDATE_LOOKUP = {}
def is_hostname(hostname):
"""
Validate hostname
"""
if len(hostname) > 255 or len(hostname) == 0:
return False
if hostname[-1] == ".":
hostname = hostname[:-1]
allowed = re.compile(r'(?!-)[A-Z\d_-]{1,63}(?<!-)$', re.IGNORECASE)
return all(allowed.match(x) for x in hostname.split("."))
def is_ipaddr(addr):
def is_ipaddr(addr, ipv4=True, ipv6=True):
"""
Validates against IPV4 and IPV6 IP Addresses
"""
# Based on https://stackoverflow.com/questions/5284147/\
# validating-ipv4-addresses-with-regexp
re_ipv4 = re.compile(
r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}'
r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
if ipv4:
# Based on https://stackoverflow.com/questions/5284147/\
# validating-ipv4-addresses-with-regexp
re_ipv4 = re.compile(
r'^(?P<ip>((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}'
r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))$'
)
match = re_ipv4.match(addr)
if match is not None:
# Return our matched IP
return match.group('ip')
if ipv6:
# Based on https://stackoverflow.com/questions/53497/\
# regular-expression-that-matches-valid-ipv6-addresses
#
# IPV6 URLs should be enclosed in square brackets when placed on a URL
# Source: https://tools.ietf.org/html/rfc2732
# - For this reason, they are additionally checked for existance
re_ipv6 = re.compile(
r'\[?(?P<ip>(([0-9a-f]{1,4}:){7,7}[0-9a-f]{1,4}|([0-9a-f]{1,4}:)'
r'{1,7}:|([0-9a-f]{1,4}:){1,6}:[0-9a-f]{1,4}|([0-9a-f]{1,4}:){1,5}'
r'(:[0-9a-f]{1,4}){1,2}|([0-9a-f]{1,4}:){1,4}'
r'(:[0-9a-f]{1,4}){1,3}|([0-9a-f]{1,4}:){1,3}'
r'(:[0-9a-f]{1,4}){1,4}|([0-9a-f]{1,4}:){1,2}'
r'(:[0-9a-f]{1,4}){1,5}|[0-9a-f]{1,4}:'
r'((:[0-9a-f]{1,4}){1,6})|:((:[0-9a-f]{1,4}){1,7}|:)|'
r'fe80:(:[0-9a-f]{0,4}){0,4}%[0-9a-z]{1,}|::'
r'(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]'
r'|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|'
r'1{0,1}[0-9]){0,1}[0-9])|([0-9a-f]{1,4}:){1,4}:((25[0-5]|'
r'(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|'
r'1{0,1}[0-9]){0,1}[0-9])))\]?', re.I,
)
match = re_ipv6.match(addr)
if match is not None:
# Return our matched IP between square brackets since that is
# required for URL formatting as per RFC 2732.
return '[{}]'.format(match.group('ip'))
# There was no match
return False
def is_hostname(hostname, ipv4=True, ipv6=True):
"""
Validate hostname
"""
# The entire hostname, including the delimiting dots, has a maximum of 253
# ASCII characters.
if len(hostname) > 253 or len(hostname) == 0:
return False
# Strip trailling period on hostname (if one exists)
if hostname[-1] == ".":
hostname = hostname[:-1]
# Split our hostname up
labels = hostname.split(".")
# ipv4 check
if len(labels) == 4 and re.match(r'[0-9.]+', hostname):
return is_ipaddr(hostname, ipv4=ipv4, ipv6=False)
# - RFC 1123 permits hostname labels to start with digits
# - digit must be followed by alpha/numeric so we don't end up
# processing IP addresses here
# - Hostnames can ony be comprised of alpha-numeric characters and the
# hyphen (-) character.
# - Hostnames can not start with the hyphen (-) character.
# - labels can not exceed 63 characters
allowed = re.compile(
r'(?!-)[a-z0-9][a-z0-9-]{1,62}(?<!-)$',
re.IGNORECASE,
)
# Based on https://stackoverflow.com/questions/53497/\
# regular-expression-that-matches-valid-ipv6-addresses
re_ipv6 = re.compile(
r'(([0-9a-f]{1,4}:){7,7}[0-9a-f]{1,4}|([0-9a-f]{1,4}:){1,7}:'
r'|([0-9a-f]{1,4}:){1,6}:[0-9a-f]{1,4}|([0-9a-f]{1,4}:){1,5}'
r'(:[0-9a-f]{1,4}){1,2}|([0-9a-f]{1,4}:){1,4}'
r'(:[0-9a-f]{1,4}){1,3}|([0-9a-f]{1,4}:){1,3}'
r'(:[0-9a-f]{1,4}){1,4}|([0-9a-f]{1,4}:){1,2}'
r'(:[0-9a-f]{1,4}){1,5}|[0-9a-f]{1,4}:'
r'((:[0-9a-f]{1,4}){1,6})|:((:[0-9a-f]{1,4}){1,7}|:)|'
r'fe80:(:[0-9a-f]{0,4}){0,4}%[0-9a-z]{1,}|::'
r'(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]'
r'|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|'
r'1{0,1}[0-9]){0,1}[0-9])|([0-9a-f]{1,4}:){1,4}:((25[0-5]|'
r'(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|'
r'1{0,1}[0-9]){0,1}[0-9]))', re.I,
)
if not all(allowed.match(x) for x in labels):
return is_ipaddr(hostname, ipv4=ipv4, ipv6=ipv6)
# Returns true if we match an IP and/or
return (re_ipv4.match(addr) is not None or re_ipv6.match(addr) is not None)
return hostname
def is_email(address):
@ -419,30 +459,22 @@ def parse_url(url, default_schema='http', verify_host=True):
# and it's already assigned
pass
try:
(result['host'], result['port']) = \
re.split(r'[:]+', result['host'])[:2]
# Max port is 65535 so (1,5 digits)
match = re.search(
r'^(?P<host>.+):(?P<port>[1-9][0-9]{0,4})$', result['host'])
if match:
# Separate our port from our hostname (if port is detected)
result['host'] = match.group('host')
result['port'] = int(match.group('port'))
except ValueError:
# no problem then, user only exists
# and it's already assigned
pass
if result['port']:
try:
result['port'] = int(result['port'])
except (ValueError, TypeError):
# Invalid Port Specified
if verify_host:
# Verify and Validate our hostname
result['host'] = is_hostname(result['host'])
if not result['host']:
# Nothing more we can do without a hostname; give the user
# some indication as to what went wrong
return None
if result['port'] == 0:
result['port'] = None
if verify_host and not is_hostname(result['host']):
# Nothing more we can do without a hostname
return None
# Re-assemble cleaned up version of the url
result['url'] = '%s://' % result['schema']
if isinstance(result['user'], six.string_types):

View File

@ -163,9 +163,6 @@ def test_dbus_plugin(mock_mainloop, mock_byte, mock_bytearray,
with pytest.raises(TypeError):
apprise.plugins.NotifyDBus(**{'schema': 'invalid'})
# Invalid URLs
assert apprise.plugins.NotifyDBus.parse_url('') is None
# Set our X and Y coordinate and try the notification
assert apprise.plugins.NotifyDBus(
x_axis=0, y_axis=0, **{'schema': 'dbus'})\

View File

@ -261,7 +261,11 @@ TEST_URLS = (
# NotifyDiscord
##################################
('discord://', {
'instance': None,
'instance': TypeError,
}),
# An invalid url
('discord://:@/', {
'instance': TypeError,
}),
# No webhook_token specified
('discord://%s' % ('i' * 24), {
@ -357,10 +361,6 @@ TEST_URLS = (
# don't include an image by default
'include_image': False,
}),
# An invalid url
('discord://:@/', {
'instance': None,
}),
('discord://%s/%s/' % ('a' * 24, 'b' * 64), {
'instance': plugins.NotifyDiscord,
# force a failure
@ -572,7 +572,10 @@ TEST_URLS = (
# NotifyFaast
##################################
('faast://', {
'instance': None,
'instance': TypeError,
}),
('faast://:@/', {
'instance': TypeError,
}),
# Auth Token specified
('faast://%s' % ('a' * 32), {
@ -586,9 +589,6 @@ TEST_URLS = (
# don't include an image by default
'include_image': False,
}),
('faast://:@/', {
'instance': None,
}),
('faast://%s' % ('a' * 32), {
'instance': plugins.NotifyFaast,
# force a failure
@ -613,7 +613,11 @@ TEST_URLS = (
##################################
# No token specified
('flock://', {
'instance': None,
'instance': TypeError,
}),
# An invalid url
('flock://:@/', {
'instance': TypeError,
}),
# Provide a token
('flock://%s' % ('t' * 24), {
@ -696,10 +700,6 @@ TEST_URLS = (
# We will still instantiate the object
'instance': plugins.NotifyFlock,
}),
# An invalid url
('flock://:@/', {
'instance': None,
}),
# Error Testing
('flock://%s/g:%s/u:%s?format=text' % ('i' * 24, 'g' * 12, 'u' * 10), {
'instance': plugins.NotifyFlock,
@ -730,10 +730,10 @@ TEST_URLS = (
# NotifyGitter
##################################
('gitter://', {
'instance': None,
'instance': TypeError,
}),
('gitter://:@/', {
'instance': None,
'instance': TypeError,
}),
# Invalid Token Length
('gitter://%s' % ('a' * 12), {
@ -853,15 +853,15 @@ TEST_URLS = (
# NotifyIFTTT - If This Than That
##################################
('ifttt://', {
'instance': None,
'instance': TypeError,
}),
('ifttt://:@/', {
'instance': TypeError,
}),
# No User
('ifttt://EventID/', {
'instance': TypeError,
}),
('ifttt://:@/', {
'instance': None,
}),
# A nicely formed ifttt url with 1 event and a new key/value store
('ifttt://WebHookID@EventID/?+TemplateKey=TemplateVal', {
'instance': plugins.NotifyIFTTT,
@ -917,7 +917,11 @@ TEST_URLS = (
# NotifyJoin
##################################
('join://', {
'instance': None,
'instance': TypeError,
}),
# API Key + bad url
('join://:@/', {
'instance': TypeError,
}),
# APIkey; no device
('join://%s' % ('a' * 32), {
@ -970,10 +974,6 @@ TEST_URLS = (
('join://%s/%s/%s' % ('a' * 32, 'd' * 32, 'group.chrome'), {
'instance': plugins.NotifyJoin,
}),
# API Key + bad url
('join://:@/', {
'instance': None,
}),
('join://%s' % ('a' * 32), {
'instance': plugins.NotifyJoin,
# force a failure
@ -1134,6 +1134,14 @@ TEST_URLS = (
('kodi://localhost', {
'instance': plugins.NotifyXBMC,
}),
('kodi://192.168.4.1', {
# Support IPv4 Addresses
'instance': plugins.NotifyXBMC,
}),
('kodi://[2001:db8:002a:3256:adfe:05c0:0003:0006]', {
# Support IPv6 Addresses
'instance': plugins.NotifyXBMC,
}),
('kodi://user:pass@localhost', {
'instance': plugins.NotifyXBMC,
@ -1203,14 +1211,14 @@ TEST_URLS = (
##################################
('kumulos://', {
# No API or Server Key specified
'instance': None,
'instance': TypeError,
}),
('kumulos://:@/', {
# No API or Server Key specified
# We don't have strict host checking on for kumulos, so this URL
# actually becomes parseable and :@ becomes a hostname.
# The below errors because a second token wasn't found
'instance': None,
'instance': TypeError,
}),
('kumulos://{}/'.format(UUID4), {
# No server key was specified
@ -1410,10 +1418,10 @@ TEST_URLS = (
# NotifyMailgun
##################################
('mailgun://', {
'instance': None,
'instance': TypeError,
}),
('mailgun://:@/', {
'instance': None,
'instance': TypeError,
}),
# No Token specified
('mailgun://user@localhost.localdomain', {
@ -1672,7 +1680,7 @@ TEST_URLS = (
# Our expected url(privacy=True) startswith() response:
'privacy_url': 'mmost://localhost:8080/3...4/',
}),
('mmost://localhost:0/3ccdd113474722377935511fc85d3dd4', {
('mmost://localhost:8080/3ccdd113474722377935511fc85d3dd4', {
'instance': plugins.NotifyMatterMost,
}),
('mmost://localhost:invalid-port/3ccdd113474722377935511fc85d3dd4', {
@ -1946,10 +1954,10 @@ TEST_URLS = (
# NotifyNotifico
##################################
('notifico://', {
'instance': None,
'instance': TypeError,
}),
('notifico://:@/', {
'instance': None,
'instance': TypeError,
}),
('notifico://1234', {
# Just a project id provided (no message token)
@ -2219,7 +2227,11 @@ TEST_URLS = (
# NotifyProwl
##################################
('prowl://', {
'instance': None,
'instance': TypeError,
}),
# bad url
('prowl://:@/', {
'instance': TypeError,
}),
# Invalid API Key
('prowl://%s' % ('a' * 20), {
@ -2273,10 +2285,6 @@ TEST_URLS = (
('prowl://%s' % ('a' * 40), {
'instance': plugins.NotifyProwl,
}),
# bad url
('prowl://:@/', {
'instance': None,
}),
('prowl://%s' % ('a' * 40), {
'instance': plugins.NotifyProwl,
# force a failure
@ -2300,10 +2308,10 @@ TEST_URLS = (
# NotifyPushBullet
##################################
('pbul://', {
'instance': None,
'instance': TypeError,
}),
('pbul://:@/', {
'instance': None,
'instance': TypeError,
}),
# APIkey
('pbul://%s' % ('a' * 32), {
@ -2439,13 +2447,13 @@ TEST_URLS = (
# NotifyPushSafer
##################################
('psafer://:@/', {
'instance': None,
'instance': TypeError,
}),
('psafer://', {
'instance': None,
'instance': TypeError,
}),
('psafers://', {
'instance': None,
'instance': TypeError,
}),
('psafer://{}'.format('a' * 20), {
'instance': plugins.NotifyPushSafer,
@ -2648,12 +2656,16 @@ TEST_URLS = (
# NotifyPushed
##################################
('pushed://', {
'instance': None,
'instance': TypeError,
}),
# Application Key Only
('pushed://%s' % ('a' * 32), {
'instance': TypeError,
}),
# Invalid URL
('pushed://:@/', {
'instance': TypeError,
}),
# Application Key+Secret
('pushed://%s/%s' % ('a' * 32, 'a' * 64), {
'instance': plugins.NotifyPushed,
@ -2708,9 +2720,6 @@ TEST_URLS = (
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
('pushed://:@/', {
'instance': None,
}),
('pushed://%s/%s' % ('a' * 32, 'a' * 64), {
'instance': plugins.NotifyPushed,
# force a failure
@ -2810,7 +2819,11 @@ TEST_URLS = (
# NotifyPushover
##################################
('pover://', {
'instance': None,
'instance': TypeError,
}),
# bad url
('pover://:@/', {
'instance': TypeError,
}),
# APIkey; no user
('pover://%s' % ('a' * 30), {
@ -2906,10 +2919,6 @@ TEST_URLS = (
('pover://%s@%s?priority=' % ('u' * 30, 'a' * 30), {
'instance': plugins.NotifyPushover,
}),
# bad url
('pover://:@/', {
'instance': None,
}),
('pover://%s@%s' % ('u' * 30, 'a' * 30), {
'instance': plugins.NotifyPushover,
# force a failure
@ -3115,10 +3124,10 @@ TEST_URLS = (
# NotifyRyver
##################################
('ryver://', {
'instance': None,
'instance': TypeError,
}),
('ryver://:@/', {
'instance': None,
'instance': TypeError,
}),
('ryver://apprise', {
# Just org provided (no token)
@ -3435,7 +3444,7 @@ TEST_URLS = (
##################################
('spush://', {
# No api key
'instance': None,
'instance': TypeError,
}),
('spush://{}'.format('A' * 14), {
# API Key specified however expected server response
@ -3658,10 +3667,10 @@ TEST_URLS = (
# NotifySNS (AWS)
##################################
('sns://', {
'instance': None,
'instance': TypeError,
}),
('sns://:@/', {
'instance': None,
'instance': TypeError,
}),
('sns://T1JJ3T3L2', {
# Just Token 1 provided
@ -4132,11 +4141,11 @@ TEST_URLS = (
##################################
('msg91://', {
# No hostname/authkey specified
'instance': None,
'instance': TypeError,
}),
('msg91://-', {
# Invalid AuthKey
'instance': None,
'instance': TypeError,
}),
('msg91://{}'.format('a' * 23), {
# No number specified
@ -4204,7 +4213,7 @@ TEST_URLS = (
##################################
('msgbird://', {
# No hostname/apikey specified
'instance': None,
'instance': TypeError,
}),
('msgbird://{}/abcd'.format('a' * 25), {
# invalid characters in source phone number
@ -4256,7 +4265,7 @@ TEST_URLS = (
##################################
('popcorn://', {
# No hostname/apikey specified
'instance': None,
'instance': TypeError,
}),
('popcorn://{}/18001231234'.format('_' * 9), {
# invalid apikey
@ -4490,10 +4499,10 @@ TEST_URLS = (
# NotifyZulip
##################################
('zulip://', {
'instance': None,
'instance': TypeError,
}),
('zulip://:@/', {
'instance': None,
'instance': TypeError,
}),
('zulip://apprise', {
# Just org provided (no token or botname)
@ -5010,12 +5019,10 @@ def test_notify_emby_plugin_login(mock_post, mock_get):
mock_post.return_value.status_code = requests.codes.ok
mock_get.return_value.status_code = requests.codes.ok
obj = Apprise.instantiate('emby://l2g:l2gpass@localhost:%d' % (
# Increment our port so it will always be something different than
# the default
plugins.NotifyEmby.emby_default_port + 1))
obj = Apprise.instantiate('emby://l2g:l2gpass@localhost:1234')
# Set a different port (outside of default)
assert isinstance(obj, plugins.NotifyEmby)
assert obj.port == (plugins.NotifyEmby.emby_default_port + 1)
assert obj.port == 1234
# The login will fail because '' is not a parseable JSON response
assert obj.login() is False
@ -5024,10 +5031,10 @@ def test_notify_emby_plugin_login(mock_post, mock_get):
obj.port = None
assert obj.login() is False
# Default port assigments
# Default port assignments
obj = Apprise.instantiate('emby://l2g:l2gpass@localhost')
assert isinstance(obj, plugins.NotifyEmby)
assert obj.port == plugins.NotifyEmby.emby_default_port
assert obj.port == 8096
# The login will (still) fail because '' is not a parseable JSON response
assert obj.login() is False

View File

@ -92,16 +92,38 @@ def test_parse_url():
assert result['qsd-'] == {}
assert result['qsd+'] == {}
result = utils.parse_url('hostname')
# colon after hostname without port number is no good
assert utils.parse_url('http://hostname:') is None
# However if we don't verify the host, it is okay
result = utils.parse_url('http://hostname:', verify_host=False)
assert result['schema'] == 'http'
assert result['host'] == 'hostname'
assert result['host'] == 'hostname:'
assert result['port'] is None
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == 'http://hostname'
assert result['url'] == 'http://hostname:'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
# A port of Zero is not valid
assert utils.parse_url('http://hostname:0') is None
# Port set to zero; port is not stored
result = utils.parse_url('http://hostname:0', verify_host=False)
assert result['schema'] == 'http'
assert result['host'] == 'hostname:0'
assert result['port'] is None
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == 'http://hostname:0'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
@ -314,22 +336,6 @@ def test_parse_url():
assert utils.parse_url('?') is None
assert utils.parse_url('/') is None
# A default port of zero is still considered valid, but
# is removed in the response.
result = utils.parse_url('http://nuxref.com:0')
assert result['schema'] == 'http'
assert result['host'] == 'nuxref.com'
assert result['port'] is None
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == 'http://nuxref.com'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
# Test some illegal strings
result = utils.parse_url(object, verify_host=False)
assert result is None
@ -475,16 +481,55 @@ def test_is_hostname():
"""
# Valid Hostnames
assert utils.is_hostname('yahoo.ca') is True
assert utils.is_hostname('yahoo.ca.') is True
assert utils.is_hostname('valid-dashes-in-host.ca') is True
assert utils.is_hostname('valid-underscores_in_host.ca') is True
assert utils.is_hostname('yahoo.ca') == 'yahoo.ca'
assert utils.is_hostname('yahoo.ca.') == 'yahoo.ca'
assert utils.is_hostname('valid-dashes-in-host.ca') == \
'valid-dashes-in-host.ca'
# Invalid Hostnames
assert utils.is_hostname('-hostname.that.starts.with.a.dash') is False
assert utils.is_hostname('invalid-characters_#^.ca') is False
assert utils.is_hostname(' spaces ') is False
assert utils.is_hostname(' ') is False
assert utils.is_hostname('') is False
assert utils.is_hostname('valid-underscores_in_host.ca') is False
# Valid IPv4 Addresses
assert utils.is_hostname('127.0.0.1') == '127.0.0.1'
assert utils.is_hostname('0.0.0.0') == '0.0.0.0'
assert utils.is_hostname('255.255.255.255') == '255.255.255.255'
# But not if we're not checking for this:
assert utils.is_hostname('127.0.0.1', ipv4=False) is False
assert utils.is_hostname('0.0.0.0', ipv4=False) is False
assert utils.is_hostname('255.255.255.255', ipv4=False) is False
# Invalid IPv4 Addresses
assert utils.is_hostname('1.2.3') is False
assert utils.is_hostname('256.256.256.256') is False
assert utils.is_hostname('999.0.0.0') is False
assert utils.is_hostname('1.2.3.4.5') is False
assert utils.is_hostname(' 127.0.0.1 ') is False
assert utils.is_hostname(' ') is False
assert utils.is_hostname('') is False
# Valid IPv6 Addresses (square brakets supported for URL construction)
assert utils.is_hostname('[2001:0db8:85a3:0000:0000:8a2e:0370:7334]') == \
'[2001:0db8:85a3:0000:0000:8a2e:0370:7334]'
assert utils.is_hostname('2001:0db8:85a3:0000:0000:8a2e:0370:7334') == \
'[2001:0db8:85a3:0000:0000:8a2e:0370:7334]'
assert utils.is_hostname('[2001:db8:002a:3256:adfe:05c0:0003:0006]') == \
'[2001:db8:002a:3256:adfe:05c0:0003:0006]'
# localhost
assert utils.is_hostname('::1') == '[::1]'
assert utils.is_hostname('0:0:0:0:0:0:0:1') == '[0:0:0:0:0:0:0:1]'
# But not if we're not checking for this:
assert utils.is_hostname(
'[2001:0db8:85a3:0000:0000:8a2e:0370:7334]', ipv6=False) is False
assert utils.is_hostname(
'2001:0db8:85a3:0000:0000:8a2e:0370:7334', ipv6=False) is False
def test_is_ipaddr():
@ -493,9 +538,9 @@ def test_is_ipaddr():
"""
# Valid IPv4 Addresses
assert utils.is_ipaddr('127.0.0.1') is True
assert utils.is_ipaddr('0.0.0.0') is True
assert utils.is_ipaddr('255.255.255.255') is True
assert utils.is_ipaddr('127.0.0.1') == '127.0.0.1'
assert utils.is_ipaddr('0.0.0.0') == '0.0.0.0'
assert utils.is_ipaddr('255.255.255.255') == '255.255.255.255'
# Invalid IPv4 Addresses
assert utils.is_ipaddr('1.2.3') is False
@ -506,8 +551,17 @@ def test_is_ipaddr():
assert utils.is_ipaddr(' ') is False
assert utils.is_ipaddr('') is False
# Valid IPv6 Addresses
assert utils.is_ipaddr('2001:0db8:85a3:0000:0000:8a2e:0370:7334') is True
# Valid IPv6 Addresses (square brakets supported for URL construction)
assert utils.is_ipaddr('[2001:0db8:85a3:0000:0000:8a2e:0370:7334]') == \
'[2001:0db8:85a3:0000:0000:8a2e:0370:7334]'
assert utils.is_ipaddr('2001:0db8:85a3:0000:0000:8a2e:0370:7334') == \
'[2001:0db8:85a3:0000:0000:8a2e:0370:7334]'
assert utils.is_ipaddr('[2001:db8:002a:3256:adfe:05c0:0003:0006]') == \
'[2001:db8:002a:3256:adfe:05c0:0003:0006]'
# localhost
assert utils.is_ipaddr('::1') == '[::1]'
assert utils.is_ipaddr('0:0:0:0:0:0:0:1') == '[0:0:0:0:0:0:0:1]'
def test_is_email():