You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
apprise/test/test_apprise_cli.py

1769 lines
52 KiB

# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2024, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os
import re
from unittest import mock
import pytest
import sys
import requests
import json
from inspect import cleandoc
from os.path import dirname
from os.path import join
from apprise import cli
from apprise import NotifyBase
from apprise import NotificationManager
from click.testing import CliRunner
from apprise.utils import environ
from apprise.locale import gettext_lazy as _
from importlib import reload
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
# Grant access to our Notification Manager Singleton
N_MGR = NotificationManager()
def test_apprise_cli_nux_env(tmpdir):
"""
CLI: Nux Environment
"""
class GoodNotification(NotifyBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def notify(self, **kwargs):
# Pretend everything is okay (when passing --disable-async)
return True
async def async_notify(self, **kwargs):
# Pretend everything is okay
return True
def url(self, *args, **kwargs):
# Support url()
return 'good://'
class BadNotification(NotifyBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def async_notify(self, **kwargs):
# Pretend everything is okay
return False
def url(self, *args, **kwargs):
# Support url()
return 'bad://'
# Set up our notification types
N_MGR['good'] = GoodNotification
N_MGR['bad'] = BadNotification
runner = CliRunner()
result = runner.invoke(cli.main)
# no servers specified; we return 1 (non-zero)
assert result.exit_code == 1
result = runner.invoke(cli.main, ['-v'])
assert result.exit_code == 1
result = runner.invoke(cli.main, ['-vv'])
assert result.exit_code == 1
result = runner.invoke(cli.main, ['-vvv'])
assert result.exit_code == 1
result = runner.invoke(cli.main, ['-vvvv'])
assert result.exit_code == 1
# Display version information and exit
result = runner.invoke(cli.main, ['-V'])
assert result.exit_code == 0
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'good://localhost',
])
assert result.exit_code == 0
with mock.patch('requests.post') as mock_post:
# Prepare Mock
mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body\\nsNewLine',
# Test using interpret escapes
'-e',
# Use our JSON query
'json://localhost',
])
assert result.exit_code == 0
# Test our call count
assert mock_post.call_count == 1
# Our string is now escaped correctly
json.loads(mock_post.call_args_list[0][1]['data'])\
.get('message', '') == 'test body\nsNewLine'
# Reset
mock_post.reset_mock()
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body\\nsNewLine',
# No -e switch at all (so we don't escape the above)
# Use our JSON query
'json://localhost',
])
assert result.exit_code == 0
# Test our call count
assert mock_post.call_count == 1
# Our string is now escaped correctly
json.loads(mock_post.call_args_list[0][1]['data'])\
.get('message', '') == 'test body\\nsNewLine'
# Run in synchronous mode
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'good://localhost',
'--disable-async',
])
assert result.exit_code == 0
# Test Debug Mode (--debug)
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'good://localhost',
'--debug',
])
assert result.exit_code == 0
# Test Debug Mode (-D)
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'good://localhost',
'-D',
])
assert result.exit_code == 0
result = runner.invoke(cli.main, [
'-t', 'test title',
'good://localhost',
], input='test stdin body\n')
assert result.exit_code == 0
# Run in synchronous mode
result = runner.invoke(cli.main, [
'-t', 'test title',
'good://localhost',
'--disable-async',
], input='test stdin body\n')
assert result.exit_code == 0
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'bad://localhost',
])
assert result.exit_code == 1
# Run in synchronous mode
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'bad://localhost',
'-Da',
])
assert result.exit_code == 1
# Testing with the --dry-run flag reveals a successful response since we
# don't actually execute the bad:// notification; we only display it
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'bad://localhost',
'--dry-run',
])
assert result.exit_code == 0
# Write a simple text based configuration file
t = tmpdir.mkdir("apprise-obj").join("apprise")
buf = """
# Include ourselves
include {}
taga,tagb=good://localhost
tagc=good://nuxref.com
""".format(str(t))
t.write(buf)
# This will read our configuration and not send any notices at all
# because we assigned tags to all of our urls and didn't identify
# a specific match below.
# 'include' reference in configuration file would have included the file a
# second time (since recursion default is 1).
result = runner.invoke(cli.main, [
'-b', 'test config',
'--config', str(t),
])
# Even when recursion take place, tags are all honored
# so 2 is returned because nothing was notified
assert result.exit_code == 3
# This will send out 1 notification because our tag matches
# one of the entries above
# translation: has taga
result = runner.invoke(cli.main, [
'-b', 'has taga',
'--config', str(t),
'--tag', 'taga',
])
assert result.exit_code == 0
# Test recursion
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'--config', str(t),
'--tag', 'tagc',
# Invalid entry specified for recursion
'-R', 'invalid',
])
assert result.exit_code == 2
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'--config', str(t),
'--tag', 'tagc',
# missing entry specified for recursion
'--recursive-depth',
])
assert result.exit_code == 2
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'--config', str(t),
'--tag', 'tagc',
# Disable recursion (thus inclusion will be ignored)
'-R', '0',
])
assert result.exit_code == 0
# Test recursion
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'--config', str(t),
'--tag', 'tagc',
# Recurse up to 5 times
'--recursion-depth', '5',
])
assert result.exit_code == 0
# This will send out 2 notifications because by specifying 2 tag
# entries, we 'or' them together:
# translation: has taga or tagb or tagd
result = runner.invoke(cli.main, [
'-b', 'has taga OR tagc OR tagd',
'--config', str(t),
'--tag', 'taga',
'--tag', 'tagc',
'--tag', 'tagd',
])
assert result.exit_code == 0
# Write a simple text based configuration file
t = tmpdir.mkdir("apprise-obj2").join("apprise-test2")
buf = """
good://localhost/1
good://localhost/2
good://localhost/3
good://localhost/4
good://localhost/5
myTag=good://localhost/6
"""
t.write(buf)
# This will read our configuration and send a notification to
# the first 5 entries in the list, but not the one that has
# the tag associated with it
result = runner.invoke(cli.main, [
'-b', 'test config',
'--config', str(t),
])
assert result.exit_code == 0
# Test our notification type switch (it defaults to info) so we want to
# try it as a different value. Should return without a problem
result = runner.invoke(cli.main, [
'-b', '# test config',
'--config', str(t),
'-n', 'success',
])
assert result.exit_code == 0
# Test our notification type switch when set to something unsupported
result = runner.invoke(cli.main, [
'-b', 'test config',
'--config', str(t),
'--notification-type', 'invalid',
])
# An error code of 2 is returned if invalid input is specified on the
# command line
assert result.exit_code == 2
# The notification type switch is case-insensitive
result = runner.invoke(cli.main, [
'-b', 'test config',
'--config', str(t),
'--notification-type', 'WARNING',
])
assert result.exit_code == 0
# Test our formatting switch (it defaults to text) so we want to try it as
# a different value. Should return without a problem
result = runner.invoke(cli.main, [
'-b', '# test config',
'--config', str(t),
'-i', 'markdown',
])
assert result.exit_code == 0
# Test our formatting switch when set to something unsupported
result = runner.invoke(cli.main, [
'-b', 'test config',
'--config', str(t),
'--input-format', 'invalid',
])
# An error code of 2 is returned if invalid input is specified on the
# command line
assert result.exit_code == 2
# The formatting switch is not case sensitive
result = runner.invoke(cli.main, [
'-b', '# test config',
'--config', str(t),
'--input-format', 'HTML',
])
assert result.exit_code == 0
# As a way of ensuring we match the first 5 entries, we can run a
# --dry-run against the same result set above and verify the output
result = runner.invoke(cli.main, [
'-b', 'test config',
'--config', str(t),
'--dry-run',
])
assert result.exit_code == 0
lines = re.split(r'[\r\n]', result.output.strip())
# 5 lines of all good:// entries matched + url id underneath
assert len(lines) == 10
# Verify we match against the remaining good:// entries
for i in range(0, 10, 2):
assert lines[i].endswith('good://')
# This will fail because nothing matches mytag. It's case sensitive
# and we would only actually match against myTag
result = runner.invoke(cli.main, [
'-b', 'has mytag',
'--config', str(t),
'--tag', 'mytag',
])
assert result.exit_code == 3
# Same command as the one identified above except we set the --dry-run
# flag. This causes our list of matched results to be printed only.
# However, since we don't match anything; we still fail with a return code
# of 2.
result = runner.invoke(cli.main, [
'-b', 'has mytag',
'--config', str(t),
'--tag', 'mytag',
'--dry-run'
])
assert result.exit_code == 3
# Here is a case where we get what was expected; we also attach a file
result = runner.invoke(cli.main, [
'-b', 'has myTag',
'--config', str(t),
'--attach', join(dirname(__file__), 'var', 'apprise-test.gif'),
'--tag', 'myTag',
])
assert result.exit_code == 0
# Testing with the --dry-run flag reveals the same positive results
# because there was at least one match
result = runner.invoke(cli.main, [
'-b', 'has myTag',
'--config', str(t),
'--tag', 'myTag',
'--dry-run',
])
assert result.exit_code == 0
#
# Test environment variables
#
# Write a simple text based configuration file
t2 = tmpdir.mkdir("apprise-obj-env").join("apprise")
buf = """
# A general one
good://localhost
# A failure (if we use the fail tag)
fail=bad://localhost
# A normal one tied to myTag
myTag=good://nuxref.com
"""
t2.write(buf)
with environ(APPRISE_URLS="good://localhost"):
# This will load okay because we defined the environment
# variable with a valid URL
result = runner.invoke(cli.main, [
'-b', 'test environment',
# Test that we ignore our tag
'--tag', 'mytag',
])
assert result.exit_code == 0
# Same action but without --tag
result = runner.invoke(cli.main, [
'-b', 'test environment',
])
assert result.exit_code == 0
with mock.patch('apprise.cli.DEFAULT_CONFIG_PATHS', []):
with environ(APPRISE_URLS=" "):
# An empty string is not valid and therefore not loaded so the
# below fails. We override the DEFAULT_CONFIG_PATHS because we
# don't want to detect ones loaded on the machine running the unit
# tests
result = runner.invoke(cli.main, [
'-b', 'test environment',
])
assert result.exit_code == 1
with environ(APPRISE_URLS="bad://localhost"):
result = runner.invoke(cli.main, [
'-b', 'test environment',
])
assert result.exit_code == 1
# If we specify an inline URL, it will over-ride the environment
# variable
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'good://localhost',
])
assert result.exit_code == 0
# A Config file also over-rides the environment variable if
# specified on the command line:
result = runner.invoke(cli.main, [
'-b', 'has myTag',
'--config', str(t2),
'--tag', 'myTag',
])
assert result.exit_code == 0
with environ(APPRISE_CONFIG=str(t2)):
# Our configuration file will load from our environmment variable
result = runner.invoke(cli.main, [
'-b', 'has myTag',
'--tag', 'myTag',
])
assert result.exit_code == 0
with mock.patch('apprise.cli.DEFAULT_CONFIG_PATHS', []):
with environ(APPRISE_CONFIG=" "):
# We will fail to send the notification as no path was
# specified.
# We override the DEFAULT_CONFIG_PATHS because we don't
# want to detect ones loaded on the machine running the unit tests
result = runner.invoke(cli.main, [
'-b', 'my message',
])
assert result.exit_code == 1
with environ(APPRISE_CONFIG="garbage/file/path.yaml"):
# We will fail to send the notification as the path
# specified is not loadable
result = runner.invoke(cli.main, [
'-b', 'my message',
])
assert result.exit_code == 1
# We can force an over-ride by specifying a config file on the
# command line options:
result = runner.invoke(cli.main, [
'-b', 'has myTag',
'--config', str(t2),
'--tag', 'myTag',
])
assert result.exit_code == 0
# Just a general test; if both the --config and urls are specified
# then the the urls trumps all
result = runner.invoke(cli.main, [
'-b', 'has myTag',
'--config', str(t2),
'good://localhost',
'--tag', 'fail',
])
# Tags are ignored, URL specified, so it trump config
assert result.exit_code == 0
# we just repeat the test as a proof that it only executes
# the urls despite the fact the --config was specified
result = runner.invoke(cli.main, [
'-b', 'reads the url entry only',
'--config', str(t2),
'good://localhost',
'--tag', 'fail',
])
# Tags are ignored, URL specified, so it trump config
assert result.exit_code == 0
# once agian, but we call bad://
result = runner.invoke(cli.main, [
'-b', 'reads the url entry only',
'--config', str(t2),
'bad://localhost',
'--tag', 'myTag',
])
assert result.exit_code == 1
# Test Escaping:
result = runner.invoke(cli.main, [
'-e',
'-t', 'test\ntitle',
'-b', 'test\nbody',
'good://localhost',
])
assert result.exit_code == 0
# Test Escaping (without title)
result = runner.invoke(cli.main, [
'--interpret-escapes',
'-b', 'test\nbody',
'good://localhost',
])
assert result.exit_code == 0
# Test Emojis:
result = runner.invoke(cli.main, [
'-j',
'-t', ':smile:',
'-b', ':grin:',
'good://localhost',
])
assert result.exit_code == 0
result = runner.invoke(cli.main, [
'--interpret-emojis',
'-t', ':smile:',
'-b', ':grin:',
'good://localhost',
])
assert result.exit_code == 0
def test_apprise_cli_modules(tmpdir):
"""
CLI: --plugin (-P)
"""
runner = CliRunner()
#
# Loading of modules works correctly
#
notify_cmod_base = tmpdir.mkdir('cli_modules')
notify_cmod = notify_cmod_base.join('hook.py')
notify_cmod.write(cleandoc("""
from apprise.decorators import notify
@notify(on="climod")
def mywrapper(body, title, notify_type, *args, **kwargs):
pass
"""))
result = runner.invoke(cli.main, [
'--plugin-path', str(notify_cmod),
'-t', 'title',
'-b', 'body',
'climod://',
])
assert result.exit_code == 0
# Test -P
result = runner.invoke(cli.main, [
'-P', str(notify_cmod),
'-t', 'title',
'-b', 'body',
'climod://',
])
assert result.exit_code == 0
# Test double hooks
notify_cmod2 = notify_cmod_base.join('hook2.py')
notify_cmod2.write(cleandoc("""
from apprise.decorators import notify
@notify(on="climod2")
def mywrapper(body, title, notify_type, *args, **kwargs):
pass
"""))
result = runner.invoke(cli.main, [
'--plugin-path', str(notify_cmod),
'--plugin-path', str(notify_cmod2),
'-t', 'title',
'-b', 'body',
'climod://',
'climod2://',
])
assert result.exit_code == 0
@pytest.mark.skipif(
sys.platform == "win32", reason="Unreliable results to be determined")
def test_apprise_cli_persistent_storage(tmpdir):
"""
CLI: test persistent storage
"""
# This is a made up class that is just used to verify
class NoURLIDNotification(NotifyBase):
"""
A no URL ID
"""
# Update URL identifier
url_identifier = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def send(self, **kwargs):
# Pretend everything is okay
return True
def url(self, *args, **kwargs):
# Support URL
return 'noper://'
def parse_url(self, *args, **kwargs):
# parse our url
return {'schema': 'noper'}
# This is a made up class that is just used to verify
class TestNotification(NotifyBase):
"""
A Testing Script
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def send(self, **kwargs):
# Test our persistent settings
self.store.set('key', 'value')
assert self.store.get('key') == 'value'
# Pretend everything is okay
return True
def url(self, *args, **kwargs):
# Support URL
return 'test://'
def parse_url(self, *args, **kwargs):
# parse our url
return {'schema': 'test'}
# assign test:// to our notification defined above
N_MGR['test'] = TestNotification
N_MGR['noper'] = NoURLIDNotification
# Write a simple text based configuration file
config = tmpdir.join("apprise.cfg")
buf = cleandoc("""
# Create a config file we can source easily
test=test://
noper=noper://
# Define a second test URL that will
two-urls=test://
# Create another entry that has no tag associatd with it
test://?entry=2
""")
config.write(buf)
runner = CliRunner()
# Generate notification that creates persistent data
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'storage',
'list',
])
# list our entries
assert result.exit_code == 0
# our persist storage has not been created yet
_stdout = result.stdout.strip()
assert re.match(
r'^1\.\s+[a-z0-9]{8}\s+0\.00B\s+unused\s+-\s+test://\s*', _stdout,
re.MULTILINE)
# An invalid mode specified
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--storage-mode', 'invalid',
'--config', str(config),
'-g', 'test',
'-t', 'title',
'-b', 'body',
])
# Bad mode specified
assert result.exit_code == 2
# Invalid uid lenth specified
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--storage-mode', 'flush',
'--storage-uid-length', 1,
'--config', str(config),
'-g', 'test',
'-t', 'title',
'-b', 'body',
])
# storage uid length to small
assert result.exit_code == 2
# No files written yet; just config file exists
dir_content = os.listdir(str(tmpdir))
assert len(dir_content) == 1
assert 'apprise.cfg' in dir_content
# Generate notification that creates persistent data
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--storage-mode', 'flush',
'--config', str(config),
'-t', 'title',
'-b', 'body',
'-g', 'test',
])
# We parsed our data accordingly
assert result.exit_code == 0
dir_content = os.listdir(str(tmpdir))
assert len(dir_content) == 2
assert 'apprise.cfg' in dir_content
assert 'ea482db7' in dir_content
# Have a look at our storage listings
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'storage',
'list',
])
# list our entries
assert result.exit_code == 0
_stdout = result.stdout.strip()
assert re.match(
r'^1\.\s+[a-z0-9_-]{8}\s+81\.00B\s+active\s+-\s+test://$', _stdout,
re.MULTILINE)
# keyword list is not required
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'storage',
])
# list our entries
assert result.exit_code == 0
_stdout = result.stdout.strip()
assert re.match(
r'^1\.\s+[a-z0-9_-]{8}\s+81\.00B\s+active\s+-\s+test://$', _stdout,
re.MULTILINE)
# search on something that won't match
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'storage',
'list',
'nomatch',
])
# list our entries
assert result.exit_code == 0
assert not result.stdout.strip()
# closest match search
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'storage',
'list',
# Closest match will hit a result
'ea',
])
# list our entries
assert result.exit_code == 0
_stdout = result.stdout.strip()
assert re.match(
r'^1\.\s+[a-z0-9_-]{8}\s+81\.00B\s+active\s+-\s+test://$', _stdout,
re.MULTILINE)
# list is the presumed option if no match
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'storage',
# Closest match will hit a result
'ea',
])
# list our entries successfully again..
assert result.exit_code == 0
_stdout = result.stdout.strip()
assert re.match(
r'^1\.\s+[a-z0-9_-]{8}\s+81\.00B\s+active\s+-\s+test://$', _stdout,
re.MULTILINE)
# Search based on tag
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'storage',
'list',
# We can match by tags too
'-g', 'test',
])
# list our entries
assert result.exit_code == 0
_stdout = result.stdout.strip()
assert re.match(
r'^1\.\s+[a-z0-9_-]{8}\s+81\.00B\s+active\s+-\s+test://$', _stdout,
re.MULTILINE)
# Prune call but prune-days set incorrectly
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--storage-prune-days', -1,
'storage',
'prune',
])
# storage prune days is invalid
assert result.exit_code == 2
# Create a tmporary namespace
tmpdir.mkdir('namespace')
# Generates another listing
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'storage',
])
# list our entries
assert result.exit_code == 0
_stdout = result.stdout.strip()
assert re.match(
r'^[0-9]\.\s+[a-z0-9_-]{8}\s+81\.00B\s+active\s+-\s+test://$', _stdout,
re.MULTILINE)
assert re.match(
r'.*\s*[0-9]\.\s+namespace\s+0\.00B\s+stale.*', _stdout,
(re.MULTILINE | re.DOTALL))
# Generates another listing but utilize the tag
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'--tag', 'test',
'storage',
])
# list our entries
assert result.exit_code == 0
_stdout = result.stdout.strip()
assert re.match(
r'^[0-9]\.\s+[a-z0-9_-]{8}\s+81\.00B\s+active\s+-\s+test://$', _stdout,
re.MULTILINE)
assert re.match(
r'.*\s*[0-9]\.\s+namespace\s+0\.00B\s+stale.*', _stdout,
(re.MULTILINE | re.DOTALL)) is None
# Clear all of our accumulated disk space
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'storage',
'clear',
])
# successful
assert result.exit_code == 0
# Generate another listing
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'storage',
])
# list our entries
assert result.exit_code == 0
_stdout = result.stdout.strip()
# back to unused state and 0 bytes
assert re.match(
r'^[0-9]\.\s+[a-z0-9_-]{8}\s+0\.00B\s+unused\s+-\s+test://$', _stdout,
re.MULTILINE)
# namespace is gone now
assert re.match(
r'.*\s*[0-9]\.\s+namespace\s+0\.00B\s+stale.*', _stdout,
(re.MULTILINE | re.DOTALL)) is None
# Provide both tags and uid
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'storage',
'ea',
'-g', 'test',
])
# list our entries
assert result.exit_code == 0
_stdout = result.stdout.strip()
# back to unused state and 0 bytes
assert re.match(
r'^[0-9]\.\s+[a-z0-9_-]{8}\s+0\.00B\s+unused\s+-\s+test://$', _stdout,
re.MULTILINE)
# Generate notification that creates persistent data
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--storage-mode', 'flush',
'--config', str(config),
'-t', 'title',
'-b', 'body',
'-g', 'test',
])
# We parsed our data accordingly
assert result.exit_code == 0
# Have a look at our storage listings
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'storage',
'list',
])
# list our entries
assert result.exit_code == 0
_stdout = result.stdout.strip()
assert re.match(
r'^1\.\s+[a-z0-9_-]{8}\s+81\.00B\s+active\s+-\s+test://$', _stdout,
re.MULTILINE)
# Prune call but prune-days set incorrectly
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'storage',
'prune',
])
# Run our prune successfully
assert result.exit_code == 0
# Have a look at our storage listings (expected no change in output)
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'storage',
'list',
])
# list our entries
assert result.exit_code == 0
_stdout = result.stdout.strip()
assert re.match(
r'^1\.\s+[a-z0-9_-]{8}\s+81\.00B\s+active\s+-\s+test://$', _stdout,
re.MULTILINE)
# Prune call but prune-days set incorrectly
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
# zero simulates a full clean
'--storage-prune-days', 0,
'storage',
'prune',
])
# Run our prune successfully
assert result.exit_code == 0
# Have a look at our storage listings (expected no change in output)
result = runner.invoke(cli.main, [
'--storage-path', str(tmpdir),
'--config', str(config),
'storage',
'list',
])
# list our entries
assert result.exit_code == 0
# Note: An prune/expiry of zero gets everything except for MS Windows
# during testing only.
# Until this can be resolved, this is the section of the test that
# caused us to disable it in MS Windows
_stdout = result.stdout.strip()
assert re.match(
r'^1\.\s+[a-z0-9_-]{8}\s+0\.00B\s+unused\s+-\s+test://$', _stdout,
re.MULTILINE)
# New Temporary namespace
new_persistent_base = tmpdir.mkdir('namespace')
with environ(APPRISE_STORAGE_PATH=str(new_persistent_base)):
# Reload our module
reload(cli)
# Nothing in our directory yet
dir_content = os.listdir(str(new_persistent_base))
assert len(dir_content) == 0
# Generate notification that creates persistent data
# storage path is pulled out of our environment variable
result = runner.invoke(cli.main, [
'--storage-mode', 'flush',
'--config', str(config),
'-t', 'title',
'-b', 'body',
'-g', 'test',
])
# We parsed our data accordingly
assert result.exit_code == 0
# Now content exists
dir_content = os.listdir(str(new_persistent_base))
assert len(dir_content) == 1
# Reload our module with our environment variable gone
reload(cli)
# Clear loaded modules
N_MGR.unload_modules()
def test_apprise_cli_details(tmpdir):
"""
CLI: --details (-l)
"""
runner = CliRunner()
#
# Testing the printout of our details
# --details or -l
#
result = runner.invoke(cli.main, [
'--details',
])
assert result.exit_code == 0
result = runner.invoke(cli.main, [
'-l',
])
assert result.exit_code == 0
# Clear loaded modules
N_MGR.unload_modules()
# This is a made up class that is just used to verify
class TestReq01Notification(NotifyBase):
"""
This class is used to test various requirement configurations
"""
# Set some requirements
requirements = {
'packages_required': [
'cryptography <= 3.4',
'ultrasync',
],
'packages_recommended': 'django',
}
def url(self, **kwargs):
# Support URL
return ''
def send(self, **kwargs):
# Pretend everything is okay (so we don't break other tests)
return True
N_MGR['req01'] = TestReq01Notification
# This is a made up class that is just used to verify
class TestReq02Notification(NotifyBase):
"""
This class is used to test various requirement configurations
"""
# Just not enabled at all
enabled = False
# Set some requirements
requirements = {
# None and/or [] is implied, but jsut to show that the code won't
# crash if explicitly set this way:
'packages_required': None,
'packages_recommended': [
'cryptography <= 3.4',
]
}
def url(self, **kwargs):
# Support URL
return ''
def send(self, **kwargs):
# Pretend everything is okay (so we don't break other tests)
return True
N_MGR['req02'] = TestReq02Notification
# This is a made up class that is just used to verify
class TestReq03Notification(NotifyBase):
"""
This class is used to test various requirement configurations
"""
# Set some requirements (but additionally include a details over-ride)
requirements = {
# We can over-ride the default details assigned to our plugin if
# specified
'details': _('some specified requirement details'),
# We can set a string value as well (it does not have to be a list)
'packages_recommended': 'cryptography <= 3.4'
}
def url(self, **kwargs):
# Support URL
return ''
def send(self, **kwargs):
# Pretend everything is okay (so we don't break other tests)
return True
N_MGR['req03'] = TestReq03Notification
# This is a made up class that is just used to verify
class TestReq04Notification(NotifyBase):
"""
This class is used to test a case where our requirements is fixed
to a None
"""
# This is the same as saying there are no requirements
requirements = None
def url(self, **kwargs):
# Support URL
return ''
def send(self, **kwargs):
# Pretend everything is okay (so we don't break other tests)
return True
N_MGR['req04'] = TestReq04Notification
# This is a made up class that is just used to verify
class TestReq05Notification(NotifyBase):
"""
This class is used to test a case where only packages_recommended
is identified
"""
requirements = {
'packages_recommended': 'cryptography <= 3.4'
}
def url(self, **kwargs):
# Support URL
return ''
def send(self, **kwargs):
# Pretend everything is okay (so we don't break other tests)
return True
N_MGR['req05'] = TestReq05Notification
class TestDisabled01Notification(NotifyBase):
"""
This class is used to test a pre-disabled state
"""
# Just flat out disable our service
enabled = False
# we'll use this as a key to make our service easier to find
# in the next part of the testing
service_name = 'na01'
def url(self, **kwargs):
# Support URL
return ''
def notify(self, **kwargs):
# Pretend everything is okay (so we don't break other tests)
return True
N_MGR['na01'] = TestDisabled01Notification
class TestDisabled02Notification(NotifyBase):
"""
This class is used to test a post-disabled state
"""
# we'll use this as a key to make our service easier to find
# in the next part of the testing
service_name = 'na02'
def __init__(self, *args, **kwargs):
super().__init__(**kwargs)
# enable state changes **AFTER** we initialize
self.enabled = False
def url(self, **kwargs):
# Support URL
return ''
def notify(self, **kwargs):
# Pretend everything is okay (so we don't break other tests)
return True
N_MGR['na02'] = TestDisabled02Notification
# We'll add a good notification to our list
class TesEnabled01Notification(NotifyBase):
"""
This class is just a simple enabled one
"""
# we'll use this as a key to make our service easier to find
# in the next part of the testing
service_name = 'good'
def url(self, **kwargs):
# Support URL
return ''
def send(self, **kwargs):
# Pretend everything is okay (so we don't break other tests)
return True
N_MGR['good'] = TesEnabled01Notification
# Verify that we can pass through all of our different details
result = runner.invoke(cli.main, [
'--details',
])
assert result.exit_code == 0
result = runner.invoke(cli.main, [
'-l',
])
assert result.exit_code == 0
# Clear loaded modules
N_MGR.unload_modules()
def test_apprise_cli_print_help():
"""
CLI: --help (-h)
"""
runner = CliRunner()
# Clear our working variables so they don't obstruct the next test
# This simulates an actual call from the CLI. Unfortunately through
# testing were occupying the same memory space so our singleton's
# have already been populated
N_MGR._paths_previously_scanned.clear()
N_MGR._custom_module_map.clear()
# Print help and exit
result = runner.invoke(cli.main, ['--help'])
assert result.exit_code == 0
result = runner.invoke(cli.main, ['-h'])
assert result.exit_code == 0
@mock.patch('requests.post')
def test_apprise_cli_plugin_loading(mock_post, tmpdir):
"""
CLI: --plugin-path (-P)
"""
# Prepare Mock
mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok
runner = CliRunner()
# Clear our working variables so they don't obstruct the next test
# This simulates an actual call from the CLI. Unfortunately through
# testing were occupying the same memory space so our singleton's
# have already been populated
N_MGR._paths_previously_scanned.clear()
N_MGR._custom_module_map.clear()
# Test a path that has no files to load in it
result = runner.invoke(cli.main, [
'--plugin-path', join(str(tmpdir), 'invalid_path'),
'-b', 'test\nbody',
'json://localhost',
])
# The path is silently loaded but fails... it's okay because the
# notification we're choosing to notify does exist
assert result.exit_code == 0
# Directories that don't exist passed in by the CLI aren't even scanned
assert len(N_MGR._paths_previously_scanned) == 0
assert len(N_MGR._custom_module_map) == 0
# Test our current existing path that has no entries in it
result = runner.invoke(cli.main, [
'--plugin-path', str(tmpdir.mkdir('empty')),
'-b', 'test\nbody',
'json://localhost',
])
# The path is silently loaded but fails... it's okay because the
# notification we're choosing to notify does exist
assert result.exit_code == 0
assert len(N_MGR._paths_previously_scanned) == 1
assert join(str(tmpdir), 'empty') in \
N_MGR._paths_previously_scanned
# However there was nothing to load
assert len(N_MGR._custom_module_map) == 0
# Clear our working variables so they don't obstruct the next test
# This simulates an actual call from the CLI. Unfortunately through
# testing were occupying the same memory space so our singleton's
# have already been populated
N_MGR._paths_previously_scanned.clear()
N_MGR._custom_module_map.clear()
# Prepare ourselves a file to work with
notify_hook_a_base = tmpdir.mkdir('random')
notify_hook_a = notify_hook_a_base.join('myhook01.py')
notify_hook_a.write(cleandoc("""
raise ImportError
"""))
result = runner.invoke(cli.main, [
'--plugin-path', str(notify_hook_a),
'-b', 'test\nbody',
# A custom hook:
'clihook://',
])
# It doesn't exist so it will fail
# meanwhile we would have failed to load the myhook path
assert result.exit_code == 1
# The path is silently loaded but fails... it's okay because the
# notification we're choosing to notify does exist
assert len(N_MGR._paths_previously_scanned) == 1
assert str(notify_hook_a) in N_MGR._paths_previously_scanned
# However there was nothing to load
assert len(N_MGR._custom_module_map) == 0
# Prepare ourselves a file to work with
notify_hook_aa = notify_hook_a_base.join('myhook02.py')
notify_hook_aa.write(cleandoc("""
garbage entry
"""))
N_MGR.plugins()
result = runner.invoke(cli.main, [
'--plugin-path', str(notify_hook_aa),
'-b', 'test\nbody',
# A custom hook:
'clihook://custom',
])
# It doesn't exist so it will fail
# meanwhile we would have failed to load the myhook path
assert result.exit_code == 1
# The path is silently loaded but fails...
# as a result the path stacks with the last
assert len(N_MGR._paths_previously_scanned) == 2
assert str(notify_hook_a) in \
N_MGR._paths_previously_scanned
assert str(notify_hook_aa) in \
N_MGR._paths_previously_scanned
# However there was nothing to load
assert len(N_MGR._custom_module_map) == 0
# Clear our working variables so they don't obstruct the next test
# This simulates an actual call from the CLI. Unfortunately through
# testing were occupying the same memory space so our singleton's
# have already been populated
N_MGR._paths_previously_scanned.clear()
N_MGR._custom_module_map.clear()
# Prepare ourselves a file to work with
notify_hook_b = tmpdir.mkdir('goodmodule').join('__init__.py')
notify_hook_b.write(cleandoc("""
from apprise.decorators import notify
# We want to trigger on anyone who configures a call to clihook://
@notify(on="clihook")
def mywrapper(body, title, notify_type, *args, **kwargs):
# A simple test - print to screen
print("{}: {} - {}".format(notify_type, title, body))
# No return (so a return of None) get's translated to True
# Define another in the same file
@notify(on="clihookA")
def mywrapper(body, title, notify_type, *args, **kwargs):
# A simple test - print to screen
print("!! {}: {} - {}".format(notify_type, title, body))
# No return (so a return of None) get's translated to True
"""))
result = runner.invoke(cli.main, [
'--plugin-path', str(tmpdir),
'-b', 'test body',
# A custom hook:
'clihook://still/valid',
])
# We can detect the goodmodule (which has an __init__.py in it)
# so we'll load okay
assert result.exit_code == 0
# Let's see how things got loaded:
assert len(N_MGR._paths_previously_scanned) == 2
assert str(tmpdir) in N_MGR._paths_previously_scanned
# absolute path to detected module is also added
assert join(str(tmpdir), 'goodmodule', '__init__.py') \
in N_MGR._paths_previously_scanned
# We also loaded our clihook properly
assert len(N_MGR._custom_module_map) == 1
# We can find our new hook loaded in our schema map now...
assert 'clihook' in N_MGR
# Capture our key for reference
key = [k for k in N_MGR._custom_module_map.keys()][0]
# We loaded 2 entries from the same file
assert len(N_MGR._custom_module_map[key]['notify']) == 2
assert 'clihook' in N_MGR._custom_module_map[key]['notify']
# Converted to lower case
assert 'clihooka' in N_MGR._custom_module_map[key]['notify']
# Our function name
assert N_MGR._custom_module_map[key]['notify']['clihook']['fn_name'] \
== 'mywrapper'
# What we parsed from the `on` keyword in the @notify decorator
assert N_MGR._custom_module_map[key]['notify']['clihook']['url'] \
== 'clihook://'
# our default name Assignment. This can be-overridden on the @notify
# decorator by just adding a name= to the parameter list
assert N_MGR['clihook'].service_name == 'Custom - clihook'
# Our Base Notification object when initialized:
assert len(
N_MGR._module_map[N_MGR._custom_module_map[key]['name']]['plugin']) \
== 2
for plugin in \
N_MGR._module_map[N_MGR._custom_module_map[key]['name']]['plugin']:
assert isinstance(plugin(), NotifyBase)
# Clear our working variables so they don't obstruct the next test
# This simulates an actual call from the CLI. Unfortunately through
# testing were occupying the same memory space so our singleton's
# have already been populated
N_MGR._paths_previously_scanned.clear()
N_MGR._custom_module_map.clear()
del N_MGR['clihook']
result = runner.invoke(cli.main, [
'--plugin-path', str(notify_hook_b),
'-b', 'test body',
# A custom hook:
'clihook://',
])
# Absolute path to __init__.py is okay
assert result.exit_code == 0
# we can verify that it prepares our message
assert result.stdout.strip() == 'info: - test body'
# Clear our working variables so they don't obstruct the next test
# This simulates an actual call from the CLI. Unfortunately through
# testing were occupying the same memory space so our singleton's
# have already been populated
N_MGR._paths_previously_scanned.clear()
N_MGR._custom_module_map.clear()
del N_MGR['clihook']
result = runner.invoke(cli.main, [
'--plugin-path', dirname(str(notify_hook_b)),
'-b', 'test body',
# A custom hook:
'clihook://',
])
# Now we succeed to load our module when pointed to it only because
# an __init__.py is found on the inside of it
assert result.exit_code == 0
# we can verify that it prepares our message
assert result.stdout.strip() == 'info: - test body'
# Test double paths that are the same; this ensures we only
# load the plugin once
result = runner.invoke(cli.main, [
'--plugin-path', dirname(str(notify_hook_b)),
'--plugin-path', str(notify_hook_b),
'--details',
])
# Now we succeed to load our module when pointed to it only because
# an __init__.py is found on the inside of it
assert result.exit_code == 0
# Clear our working variables so they don't obstruct the next test
# This simulates an actual call from the CLI. Unfortunately through
# testing were occupying the same memory space so our singleton's
# have already been populated
N_MGR._paths_previously_scanned.clear()
N_MGR._custom_module_map.clear()
del N_MGR['clihook']
# Prepare ourselves a file to work with
notify_hook_b = tmpdir.mkdir('complex').join('complex.py')
notify_hook_b.write(cleandoc("""
from apprise.decorators import notify
# We can't over-ride an element that already exists
# in this case json://
@notify(on="json")
def mywrapper_01(body, title, notify_type, *args, **kwargs):
# Return True (same as None)
return True
@notify(on="willfail", name="always failing...")
def mywrapper_02(body, title, notify_type, *args, **kwargs):
# Simply fail
return False
@notify(on="clihook1", name="the original clihook entry")
def mywrapper_03(body, title, notify_type, *args, **kwargs):
# Return True
return True
# This is a duplicate o the entry above, so it can not be
# loaded...
@notify(on="clihook1", name="a duplicate of the clihook entry")
def mywrapper_04(body, title, notify_type, *args, **kwargs):
# Return True
return True
# This is where things get realy cool... we can not only
# define the schema we want to over-ride, but we can define
# some default values to pass into our wrapper function to
# act as a base before whatever was actually passed in is
# applied ontop.... think of it like templating information
@notify(on="clihook2://localhost")
def mywrapper_05(body, title, notify_type, *args, **kwargs):
# Return True
return True
# This can't load because of the defined schema/on definition
@notify(on="", name="an invalid schema was specified")
def mywrapper_06(body, title, notify_type, *args, **kwargs):
return True
"""))
result = runner.invoke(cli.main, [
'--plugin-path', join(str(tmpdir), 'complex'),
'-b', 'test body',
# A custom hook that does not exist
'clihook://',
])
# Since clihook:// isn't in our complex listing, this will fail
assert result.exit_code == 1
# Let's see how things got loaded
assert len(N_MGR._paths_previously_scanned) == 2
# Our path we specified on the CLI...
assert join(str(tmpdir), 'complex') in N_MGR._paths_previously_scanned
# absolute path to detected module is also added
assert join(str(tmpdir), 'complex', 'complex.py') \
in N_MGR._paths_previously_scanned
# We loaded our one module successfuly
assert len(N_MGR._custom_module_map) == 1
# We can find our new hook loaded in our SCHEMA_MAP now...
assert 'willfail' in N_MGR
assert 'clihook1' in N_MGR
assert 'clihook2' in N_MGR
# Capture our key for reference
key = [k for k in N_MGR._custom_module_map.keys()][0]
assert len(N_MGR._custom_module_map[key]['notify']) == 3
assert 'willfail' in N_MGR._custom_module_map[key]['notify']
assert 'clihook1' in N_MGR._custom_module_map[key]['notify']
# We only load 1 instance of the clihook2, the second will fail
assert 'clihook2' in N_MGR._custom_module_map[key]['notify']
# We can never load previously created notifications
assert 'json' not in N_MGR._custom_module_map[key]['notify']
result = runner.invoke(cli.main, [
'--plugin-path', join(str(tmpdir), 'complex'),
'-b', 'test body',
# A custom notification set up for failure
'willfail://',
])
# Note that the failure of the decorator carries all the way back
# to the CLI
assert result.exit_code == 1
result = runner.invoke(cli.main, [
'--plugin-path', join(str(tmpdir), 'complex'),
'-b', 'test body',
# our clihook that returns true
'clihook1://',
# our other loaded clihook
'clihook2://',
])
# Note that the failure of the decorator carries all the way back
# to the CLI
assert result.exit_code == 0
result = runner.invoke(cli.main, [
'--plugin-path', join(str(tmpdir), 'complex'),
# Print our custom details to the screen
'--details',
])
assert 'willfail' in result.stdout
assert 'always failing...' in result.stdout
assert 'clihook1' in result.stdout
assert 'the original clihook entry' in result.stdout
assert 'a duplicate of the clihook entry' not in result.stdout
assert 'clihook2' in result.stdout
assert 'Custom - clihook2' in result.stdout
# Note that the failure of the decorator carries all the way back
# to the CLI
assert result.exit_code == 0
@mock.patch('platform.system')
def test_apprise_cli_windows_env(mock_system):
"""
CLI: Windows Environment
"""
# Force a windows environment
mock_system.return_value = 'Windows'
# Reload our module
reload(cli)