mirror of https://github.com/caronc/apprise
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
363 lines
13 KiB
363 lines
13 KiB
# -*- coding: utf-8 -*-
|
|
# BSD 2-Clause License
|
|
#
|
|
# Apprise - Push Notification Library.
|
|
# Copyright (c) 2024, Chris Caron <lead2gold@gmail.com>
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# 1. Redistributions of source code must retain the above copyright notice,
|
|
# this list of conditions and the following disclaimer.
|
|
#
|
|
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
# this list of conditions and the following disclaimer in the documentation
|
|
# and/or other materials provided with the distribution.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
import pytest
|
|
from datetime import datetime
|
|
from datetime import timedelta
|
|
|
|
from apprise.plugins import NotifyBase
|
|
from apprise import NotifyType
|
|
from apprise import NotifyImageSize
|
|
from timeit import default_timer
|
|
|
|
# Disable logging for a cleaner testing output
|
|
import logging
|
|
logging.disable(logging.CRITICAL)
|
|
|
|
|
|
def test_notify_base():
|
|
"""
|
|
API: NotifyBase() object
|
|
|
|
"""
|
|
|
|
# invalid types throw exceptions
|
|
with pytest.raises(TypeError):
|
|
NotifyBase(**{'format': 'invalid'})
|
|
|
|
# invalid types throw exceptions
|
|
with pytest.raises(TypeError):
|
|
NotifyBase(**{'overflow': 'invalid'})
|
|
|
|
# Bad port information
|
|
nb = NotifyBase(port='invalid')
|
|
assert nb.port is None
|
|
|
|
nb = NotifyBase(port=10)
|
|
assert nb.port == 10
|
|
|
|
assert isinstance(nb.url(), str)
|
|
assert str(nb) == nb.url()
|
|
|
|
with pytest.raises(NotImplementedError):
|
|
# Each sub-module is that inherits this as a parent is required to
|
|
# over-ride this function. So direct calls to this throws a not
|
|
# implemented error intentionally
|
|
nb.send('test message')
|
|
|
|
# Throttle overrides..
|
|
nb = NotifyBase()
|
|
nb.request_rate_per_sec = 0.0
|
|
start_time = default_timer()
|
|
nb.throttle()
|
|
elapsed = default_timer() - start_time
|
|
# Should be a very fast response time since we set it to zero but we'll
|
|
# check for less then 500 to be fair as some testing systems may be slower
|
|
# then other
|
|
assert elapsed < 0.5
|
|
|
|
# Concurrent calls should achieve the same response
|
|
start_time = default_timer()
|
|
nb.throttle()
|
|
elapsed = default_timer() - start_time
|
|
assert elapsed < 0.5
|
|
|
|
nb = NotifyBase()
|
|
nb.request_rate_per_sec = 1.0
|
|
|
|
# Set our time to now
|
|
start_time = default_timer()
|
|
nb.throttle()
|
|
elapsed = default_timer() - start_time
|
|
# A first call to throttle (Without telling it a time previously ran) does
|
|
# not block for any length of time; it just merely sets us up for
|
|
# concurrent calls to block
|
|
assert elapsed < 0.5
|
|
|
|
# Concurrent calls could take up to the rate_per_sec though...
|
|
start_time = default_timer()
|
|
nb.throttle(last_io=datetime.now())
|
|
elapsed = default_timer() - start_time
|
|
assert elapsed > 0.48 and elapsed < 1.5
|
|
|
|
nb = NotifyBase()
|
|
nb.request_rate_per_sec = 1.0
|
|
|
|
# Set our time to now
|
|
start_time = default_timer()
|
|
nb.throttle(last_io=datetime.now())
|
|
elapsed = default_timer() - start_time
|
|
# because we told it that we had already done a previous action (now)
|
|
# the throttle holds out until the right time has passed
|
|
assert elapsed > 0.48 and elapsed < 1.5
|
|
|
|
# Concurrent calls could take up to the rate_per_sec though...
|
|
start_time = default_timer()
|
|
nb.throttle(last_io=datetime.now())
|
|
elapsed = default_timer() - start_time
|
|
assert elapsed > 0.48 and elapsed < 1.5
|
|
|
|
nb = NotifyBase()
|
|
start_time = default_timer()
|
|
nb.request_rate_per_sec = 1.0
|
|
# Force a time in the past
|
|
nb.throttle(last_io=(datetime.now() - timedelta(seconds=20)))
|
|
elapsed = default_timer() - start_time
|
|
# Should be a very fast response time since we set it to zero but we'll
|
|
# check for less then 500 to be fair as some testing systems may be slower
|
|
# then other
|
|
assert elapsed < 0.5
|
|
|
|
# Force a throttle time
|
|
start_time = default_timer()
|
|
nb.throttle(wait=0.5)
|
|
elapsed = default_timer() - start_time
|
|
assert elapsed > 0.48 and elapsed < 1.5
|
|
|
|
# our NotifyBase wasn't initialized with an ImageSize so this will fail
|
|
assert nb.image_url(notify_type=NotifyType.INFO) is None
|
|
assert nb.image_path(notify_type=NotifyType.INFO) is None
|
|
assert nb.image_raw(notify_type=NotifyType.INFO) is None
|
|
|
|
# Color handling
|
|
assert nb.color(notify_type='invalid') is None
|
|
assert isinstance(
|
|
nb.color(notify_type=NotifyType.INFO, color_type=None),
|
|
str)
|
|
assert isinstance(
|
|
nb.color(notify_type=NotifyType.INFO, color_type=int), int)
|
|
assert isinstance(
|
|
nb.color(notify_type=NotifyType.INFO, color_type=tuple), tuple)
|
|
|
|
# Ascii Handling
|
|
assert nb.ascii(notify_type='invalid') is None
|
|
assert nb.ascii(NotifyType.INFO) == '[i]'
|
|
assert nb.ascii(NotifyType.SUCCESS) == '[+]'
|
|
assert nb.ascii(NotifyType.WARNING) == '[~]'
|
|
assert nb.ascii(NotifyType.FAILURE) == '[!]'
|
|
|
|
# Create an object
|
|
nb = NotifyBase()
|
|
# Force an image size since the default doesn't have one
|
|
nb.image_size = NotifyImageSize.XY_256
|
|
|
|
# We'll get an object this time around
|
|
assert nb.image_url(notify_type=NotifyType.INFO) is not None
|
|
assert nb.image_path(notify_type=NotifyType.INFO) is not None
|
|
assert nb.image_raw(notify_type=NotifyType.INFO) is not None
|
|
|
|
# But we will not get a response with an invalid notification type
|
|
assert nb.image_url(notify_type='invalid') is None
|
|
assert nb.image_path(notify_type='invalid') is None
|
|
assert nb.image_raw(notify_type='invalid') is None
|
|
|
|
# Static function testing
|
|
assert NotifyBase.escape_html("<content>'\t \n</content>") == \
|
|
'<content>'  \n</content>'
|
|
|
|
assert NotifyBase.escape_html(
|
|
"<content>'\t \n</content>", convert_new_lines=True) == \
|
|
'<content>'  <br/></content>'
|
|
|
|
# Test invalid data
|
|
assert NotifyBase.split_path(None) == []
|
|
assert NotifyBase.split_path(object()) == []
|
|
assert NotifyBase.split_path(42) == []
|
|
|
|
assert NotifyBase.split_path(
|
|
'/path/?name=Dr%20Disrespect', unquote=False) == \
|
|
['path', '?name=Dr%20Disrespect']
|
|
|
|
assert NotifyBase.split_path(
|
|
'/path/?name=Dr%20Disrespect', unquote=True) == \
|
|
['path', '?name=Dr Disrespect']
|
|
|
|
# a slash found inside the path, if escaped properly will not be broken
|
|
# by split_path while additional concatinated slashes are ignored
|
|
# FYI: %2F = /
|
|
assert NotifyBase.split_path(
|
|
'/%2F///%2F%2F////%2F%2F%2F////', unquote=True) == \
|
|
['/', '//', '///']
|
|
|
|
# Test invalid data
|
|
assert NotifyBase.parse_list(None) == []
|
|
assert NotifyBase.parse_list(object()) == []
|
|
assert NotifyBase.parse_list(42) == []
|
|
|
|
result = NotifyBase.parse_list(
|
|
',path,?name=Dr%20Disrespect', unquote=False)
|
|
assert isinstance(result, list)
|
|
assert len(result) == 2
|
|
assert 'path' in result
|
|
assert '?name=Dr%20Disrespect' in result
|
|
|
|
result = NotifyBase.parse_list(',path,?name=Dr%20Disrespect', unquote=True)
|
|
assert isinstance(result, list)
|
|
assert len(result) == 2
|
|
assert 'path' in result
|
|
assert '?name=Dr Disrespect' in result
|
|
|
|
# by parse_list while additional concatinated slashes are ignored
|
|
# FYI: %2F = /
|
|
# In this lit there are actually 4 entries, however parse_list
|
|
# eliminates duplicates in addition to unquoting content by default
|
|
result = NotifyBase.parse_list(
|
|
',%2F,%2F%2F, , , ,%2F%2F%2F, %2F', unquote=True)
|
|
assert isinstance(result, list)
|
|
assert len(result) == 3
|
|
assert '/' in result
|
|
assert '//' in result
|
|
assert '///' in result
|
|
|
|
# Phone number parsing
|
|
assert NotifyBase.parse_phone_no(None) == []
|
|
assert NotifyBase.parse_phone_no(object()) == []
|
|
assert NotifyBase.parse_phone_no(42) == []
|
|
|
|
result = NotifyBase.parse_phone_no(
|
|
'+1-800-123-1234,(800) 123-4567', unquote=False)
|
|
assert isinstance(result, list)
|
|
assert len(result) == 2
|
|
assert '+1-800-123-1234' in result
|
|
assert '(800) 123-4567' in result
|
|
|
|
# %2B == +
|
|
result = NotifyBase.parse_phone_no(
|
|
'%2B1-800-123-1234,%2B1%20800%20123%204567', unquote=True)
|
|
assert isinstance(result, list)
|
|
assert len(result) == 2
|
|
assert '+1-800-123-1234' in result
|
|
assert '+1 800 123 4567' in result
|
|
|
|
# Give nothing, get nothing
|
|
assert NotifyBase.escape_html("") == ""
|
|
assert NotifyBase.escape_html(None) == ""
|
|
assert NotifyBase.escape_html(object()) == ""
|
|
|
|
# Test quote
|
|
assert NotifyBase.unquote('%20') == ' '
|
|
assert NotifyBase.quote(' ') == '%20'
|
|
assert NotifyBase.unquote(None) == ''
|
|
assert NotifyBase.quote(None) == ''
|
|
|
|
|
|
def test_notify_base_urls():
|
|
"""
|
|
API: NotifyBase() URLs
|
|
|
|
"""
|
|
|
|
# Test verify switch whih is used as part of the SSL Verification
|
|
# by default all SSL sites are verified unless this flag is set to
|
|
# something like 'No', 'False', 'Disabled', etc. Boolean values are
|
|
# pretty forgiving.
|
|
results = NotifyBase.parse_url('https://localhost:8080/?verify=No')
|
|
assert 'verify' in results
|
|
assert results['verify'] is False
|
|
|
|
results = NotifyBase.parse_url('https://localhost:8080/?verify=Yes')
|
|
assert 'verify' in results
|
|
assert results['verify'] is True
|
|
|
|
# The default is to verify
|
|
results = NotifyBase.parse_url('https://localhost:8080')
|
|
assert 'verify' in results
|
|
assert results['verify'] is True
|
|
|
|
# Password Handling
|
|
|
|
# pass keyword over-rides default password
|
|
results = NotifyBase.parse_url('https://user:pass@localhost')
|
|
assert 'password' in results
|
|
assert results['password'] == "pass"
|
|
|
|
# pass keyword over-rides default password
|
|
results = NotifyBase.parse_url(
|
|
'https://user:pass@localhost?pass=newpassword')
|
|
assert 'password' in results
|
|
assert results['password'] == "newpassword"
|
|
|
|
# password keyword can also optionally be used
|
|
results = NotifyBase.parse_url(
|
|
'https://user:pass@localhost?password=passwd')
|
|
assert 'password' in results
|
|
assert results['password'] == "passwd"
|
|
|
|
# pass= override password=
|
|
# password keyword can also optionally be used
|
|
results = NotifyBase.parse_url(
|
|
'https://user:pass@localhost?pass=pw1&password=pw2')
|
|
assert 'password' in results
|
|
assert results['password'] == "pw1"
|
|
|
|
# Options
|
|
results = NotifyBase.parse_url('https://localhost?format=invalid')
|
|
assert 'format' not in results
|
|
results = NotifyBase.parse_url('https://localhost?format=text')
|
|
assert 'format' in results
|
|
assert results['format'] == 'text'
|
|
results = NotifyBase.parse_url('https://localhost?format=markdown')
|
|
assert 'format' in results
|
|
assert results['format'] == 'markdown'
|
|
results = NotifyBase.parse_url('https://localhost?format=html')
|
|
assert 'format' in results
|
|
assert results['format'] == 'html'
|
|
|
|
results = NotifyBase.parse_url('https://localhost?overflow=invalid')
|
|
assert 'overflow' not in results
|
|
results = NotifyBase.parse_url('https://localhost?overflow=upstream')
|
|
assert 'overflow' in results
|
|
assert results['overflow'] == 'upstream'
|
|
results = NotifyBase.parse_url('https://localhost?overflow=split')
|
|
assert 'overflow' in results
|
|
assert results['overflow'] == 'split'
|
|
results = NotifyBase.parse_url('https://localhost?overflow=truncate')
|
|
assert 'overflow' in results
|
|
assert results['overflow'] == 'truncate'
|
|
|
|
# User Handling
|
|
|
|
# user keyword over-rides default password
|
|
results = NotifyBase.parse_url('https://user:pass@localhost')
|
|
assert 'user' in results
|
|
assert results['user'] == "user"
|
|
|
|
# user keyword over-rides default password
|
|
results = NotifyBase.parse_url(
|
|
'https://user:pass@localhost?user=newuser')
|
|
assert 'user' in results
|
|
assert results['user'] == "newuser"
|
|
|
|
# Test invalid urls
|
|
assert NotifyBase.parse_url('https://:@/') is None
|
|
assert NotifyBase.parse_url('http://:@') is None
|
|
assert NotifyBase.parse_url('http://@') is None
|
|
assert NotifyBase.parse_url('http:///') is None
|
|
assert NotifyBase.parse_url('http://:test/') is None
|
|
assert NotifyBase.parse_url('http://pass:test/') is None
|