Apprise configuration support for 'include' keyword (#278)

pull/282/head
Chris Caron 2020-08-25 17:54:31 -04:00 committed by GitHub
parent 6e1b8a0bd6
commit 25514643f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1160 additions and 258 deletions

View File

@ -46,7 +46,8 @@ class AppriseConfig(object):
"""
def __init__(self, paths=None, asset=None, cache=True, **kwargs):
def __init__(self, paths=None, asset=None, cache=True, recursion=0,
insecure_includes=False, **kwargs):
"""
Loads all of the paths specified (if any).
@ -69,6 +70,29 @@ class AppriseConfig(object):
It's also worth nothing that the cache value is only set to elements
that are not already of subclass ConfigBase()
recursion defines how deep we recursively handle entries that use the
`import` keyword. This keyword requires us to fetch more configuration
from another source and add it to our existing compilation. If the
file we remotely retrieve also has an `import` reference, we will only
advance through it if recursion is set to 2 deep. If set to zero
it is off. There is no limit to how high you set this value. It would
be recommended to keep it low if you do intend to use it.
insecure includes by default are disabled. When set to True, all
Apprise Config files marked to be in STRICT mode are treated as being
in ALWAYS mode.
Take a file:// based configuration for example, only a file:// based
configuration can import another file:// based one. because it is set
to STRICT mode. If an http:// based configuration file attempted to
import a file:// one it woul fail. However this import would be
possible if insecure_includes is set to True.
There are cases where a self hosting apprise developer may wish to load
configuration from memory (in a string format) that contains import
entries (even file:// based ones). In these circumstances if you want
these includes to be honored, this value must be set to True.
"""
# Initialize a server list of URLs
@ -81,13 +105,20 @@ class AppriseConfig(object):
# Set our cache flag
self.cache = cache
# Initialize our recursion value
self.recursion = recursion
# Initialize our insecure_includes flag
self.insecure_includes = insecure_includes
if paths is not None:
# Store our path(s)
self.add(paths)
return
def add(self, configs, asset=None, tag=None, cache=True):
def add(self, configs, asset=None, tag=None, cache=True, recursion=None,
insecure_includes=None):
"""
Adds one or more config URLs into our list.
@ -107,6 +138,12 @@ class AppriseConfig(object):
It's also worth nothing that the cache value is only set to elements
that are not already of subclass ConfigBase()
Optionally override the default recursion value.
Optionally override the insecure_includes flag.
if insecure_includes is set to True then all plugins that are
set to a STRICT mode will be a treated as ALWAYS.
"""
# Initialize our return status
@ -115,6 +152,14 @@ class AppriseConfig(object):
# Initialize our default cache value
cache = cache if cache is not None else self.cache
# Initialize our default recursion value
recursion = recursion if recursion is not None else self.recursion
# Initialize our default insecure_includes value
insecure_includes = \
insecure_includes if insecure_includes is not None \
else self.insecure_includes
if asset is None:
# prepare default asset
asset = self.asset
@ -154,7 +199,8 @@ class AppriseConfig(object):
# Instantiate ourselves an object, this function throws or
# returns None if it fails
instance = AppriseConfig.instantiate(
_config, asset=asset, tag=tag, cache=cache)
_config, asset=asset, tag=tag, cache=cache,
recursion=recursion, insecure_includes=insecure_includes)
if not isinstance(instance, ConfigBase):
return_status = False
continue
@ -165,7 +211,8 @@ class AppriseConfig(object):
# Return our status
return return_status
def add_config(self, content, asset=None, tag=None, format=None):
def add_config(self, content, asset=None, tag=None, format=None,
recursion=None, insecure_includes=None):
"""
Adds one configuration file in it's raw format. Content gets loaded as
a memory based object and only exists for the life of this
@ -174,8 +221,22 @@ class AppriseConfig(object):
If you know the format ('yaml' or 'text') you can specify
it for slightly less overhead during this call. Otherwise the
configuration is auto-detected.
Optionally override the default recursion value.
Optionally override the insecure_includes flag.
if insecure_includes is set to True then all plugins that are
set to a STRICT mode will be a treated as ALWAYS.
"""
# Initialize our default recursion value
recursion = recursion if recursion is not None else self.recursion
# Initialize our default insecure_includes value
insecure_includes = \
insecure_includes if insecure_includes is not None \
else self.insecure_includes
if asset is None:
# prepare default asset
asset = self.asset
@ -190,7 +251,8 @@ class AppriseConfig(object):
# Create ourselves a ConfigMemory Object to store our configuration
instance = config.ConfigMemory(
content=content, format=format, asset=asset, tag=tag)
content=content, format=format, asset=asset, tag=tag,
recursion=self.recursion, insecure_includes=insecure_includes)
# Add our initialized plugin to our server listings
self.configs.append(instance)
@ -235,6 +297,7 @@ class AppriseConfig(object):
@staticmethod
def instantiate(url, asset=None, tag=None, cache=None,
recursion=0, insecure_includes=False,
suppress_exceptions=True):
"""
Returns the instance of a instantiated configuration plugin based on
@ -279,6 +342,12 @@ class AppriseConfig(object):
# Force an over-ride of the cache value to what we have specified
results['cache'] = cache
# Recursion can never be parsed from the URL
results['recursion'] = recursion
# Insecure includes flag can never be parsed from the URL
results['insecure_includes'] = insecure_includes
if suppress_exceptions:
try:
# Attempt to create an instance of our plugin using the parsed

View File

@ -674,3 +674,24 @@ class URLBase(object):
response = ''
return response
def schemas(self):
"""A simple function that returns a set of all schemas associated
with this object based on the object.protocol and
object.secure_protocol
"""
schemas = set([])
for key in ('protocol', 'secure_protocol'):
schema = getattr(self, key, None)
if isinstance(schema, six.string_types):
schemas.add(schema)
elif isinstance(schema, (set, list, tuple)):
# Support iterables list types
for s in schema:
if isinstance(s, six.string_types):
schemas.add(s)
return schemas

View File

@ -41,6 +41,8 @@ from .common import OverflowMode
from .common import OVERFLOW_MODES
from .common import ConfigFormat
from .common import CONFIG_FORMATS
from .common import ConfigIncludeMode
from .common import CONFIG_INCLUDE_MODES
from .URLBase import URLBase
from .URLBase import PrivacyMode
@ -66,5 +68,7 @@ __all__ = [
# Reference
'NotifyType', 'NotifyImageSize', 'NotifyFormat', 'OverflowMode',
'NOTIFY_TYPES', 'NOTIFY_IMAGE_SIZES', 'NOTIFY_FORMATS', 'OVERFLOW_MODES',
'ConfigFormat', 'CONFIG_FORMATS', 'PrivacyMode',
'ConfigFormat', 'CONFIG_FORMATS',
'ConfigIncludeMode', 'CONFIG_INCLUDE_MODES',
'PrivacyMode',
]

View File

@ -46,6 +46,10 @@ from . import __version__
from . import __license__
from . import __copywrite__
# By default we allow looking 1 level down recursivly in Apprise configuration
# files.
DEFAULT_RECURSION_DEPTH = 1
# Defines our click context settings adding -h to the additional options that
# can be specified to get the help menu to come up
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@ -129,6 +133,11 @@ def print_version_msg():
help='Perform a trial run but only prints the notification '
'services to-be triggered to stdout. Notifications are never '
'sent using this mode.')
@click.option('--recursion-depth', '-R', default=DEFAULT_RECURSION_DEPTH,
type=int,
help='The number of recursive import entries that can be '
'loaded from within Apprise configuration. By default '
'this is set to {}.'.format(DEFAULT_RECURSION_DEPTH))
@click.option('--verbose', '-v', count=True,
help='Makes the operation more talkative. Use multiple v to '
'increase the verbosity. I.e.: -vvvv')
@ -138,7 +147,8 @@ def print_version_msg():
@click.argument('urls', nargs=-1,
metavar='SERVER_URL [SERVER_URL2 [SERVER_URL3]]',)
def main(body, title, config, attach, urls, notification_type, theme, tag,
input_format, dry_run, verbose, disable_async, debug, version):
input_format, dry_run, recursion_depth, verbose, disable_async,
debug, version):
"""
Send a notification to all of the specified servers identified by their
URLs the content provided within the title, body and notification-type.
@ -199,14 +209,18 @@ def main(body, title, config, attach, urls, notification_type, theme, tag,
logger.error(
'The --notification-type (-n) value of {} is not supported.'
.format(notification_type))
sys.exit(1)
# 2 is the same exit code returned by Click if there is a parameter
# issue. For consistency, we also return a 2
sys.exit(2)
input_format = input_format.strip().lower()
if input_format not in NOTIFY_FORMATS:
logger.error(
'The --input-format (-i) value of {} is not supported.'
.format(input_format))
sys.exit(1)
# 2 is the same exit code returned by Click if there is a parameter
# issue. For consistency, we also return a 2
sys.exit(2)
# Prepare our asset
asset = AppriseAsset(
@ -227,7 +241,7 @@ def main(body, title, config, attach, urls, notification_type, theme, tag,
a.add(AppriseConfig(
paths=[f for f in DEFAULT_SEARCH_PATHS if isfile(expanduser(f))]
if not (config or urls) else config,
asset=asset))
asset=asset, recursion=recursion_depth))
# Load our inventory up
for url in urls:
@ -281,7 +295,10 @@ def main(body, title, config, attach, urls, notification_type, theme, tag,
# There were no notifications set. This is a result of just having
# empty configuration files and/or being to restrictive when filtering
# by specific tag(s)
sys.exit(2)
# Exit code 3 is used since Click uses exit code 2 if there is an
# error with the parameters specified
sys.exit(3)
elif result is False:
# At least 1 notification service failed to send

View File

@ -129,6 +129,31 @@ CONFIG_FORMATS = (
ConfigFormat.YAML,
)
class ConfigIncludeMode(object):
"""
The different Cofiguration inclusion modes. All Configuration
plugins will have one of these associated with it.
"""
# - Configuration inclusion of same type only; hence a file:// can include
# a file://
# - Cross file inclusion is not allowed unless insecure_includes (a flag)
# is set to True. In these cases STRICT acts as type ALWAYS
STRICT = 'strict'
# This configuration type can never be included
NEVER = 'never'
# File configuration can always be included
ALWAYS = 'always'
CONFIG_INCLUDE_MODES = (
ConfigIncludeMode.STRICT,
ConfigIncludeMode.NEVER,
ConfigIncludeMode.ALWAYS,
)
# This is a reserved tag that is automatically assigned to every
# Notification Plugin
MATCH_ALL_TAG = 'all'

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
@ -34,9 +34,11 @@ from ..AppriseAsset import AppriseAsset
from ..URLBase import URLBase
from ..common import ConfigFormat
from ..common import CONFIG_FORMATS
from ..common import ConfigIncludeMode
from ..utils import GET_SCHEMA_RE
from ..utils import parse_list
from ..utils import parse_bool
from . import SCHEMA_MAP
class ConfigBase(URLBase):
@ -60,7 +62,15 @@ class ConfigBase(URLBase):
# anything else. 128KB (131072B)
max_buffer_size = 131072
def __init__(self, cache=True, **kwargs):
# By default all configuration is not includable using the 'include'
# line found in configuration files.
allow_cross_includes = ConfigIncludeMode.NEVER
# the config path manages the handling of relative include
config_path = os.getcwd()
def __init__(self, cache=True, recursion=0, insecure_includes=False,
**kwargs):
"""
Initialize some general logging and common server arguments that will
keep things consistent when working with the configurations that
@ -76,6 +86,29 @@ class ConfigBase(URLBase):
You can alternatively set the cache value to an int identifying the
number of seconds the previously retrieved can exist for before it
should be considered expired.
recursion defines how deep we recursively handle entries that use the
`include` keyword. This keyword requires us to fetch more configuration
from another source and add it to our existing compilation. If the
file we remotely retrieve also has an `include` reference, we will only
advance through it if recursion is set to 2 deep. If set to zero
it is off. There is no limit to how high you set this value. It would
be recommended to keep it low if you do intend to use it.
insecure_include by default are disabled. When set to True, all
Apprise Config files marked to be in STRICT mode are treated as being
in ALWAYS mode.
Take a file:// based configuration for example, only a file:// based
configuration can include another file:// based one. because it is set
to STRICT mode. If an http:// based configuration file attempted to
include a file:// one it woul fail. However this include would be
possible if insecure_includes is set to True.
There are cases where a self hosting apprise developer may wish to load
configuration from memory (in a string format) that contains 'include'
entries (even file:// based ones). In these circumstances if you want
these 'include' entries to be honored, this value must be set to True.
"""
super(ConfigBase, self).__init__(**kwargs)
@ -88,6 +121,12 @@ class ConfigBase(URLBase):
# Tracks previously loaded content for speed
self._cached_servers = None
# Initialize our recursion value
self.recursion = recursion
# Initialize our insecure_includes flag
self.insecure_includes = insecure_includes
if 'encoding' in kwargs:
# Store the encoding
self.encoding = kwargs.get('encoding')
@ -157,15 +196,107 @@ class ConfigBase(URLBase):
# Initialize our asset object
asset = asset if isinstance(asset, AppriseAsset) else self.asset
# Execute our config parse function which always returns a list
self._cached_servers.extend(fn(content=content, asset=asset))
# Execute our config parse function which always returns a tuple
# of our servers and our configuration
servers, configs = fn(content=content, asset=asset)
self._cached_servers.extend(servers)
if len(self._cached_servers):
# Configuration files were detected; recursively populate them
# If we have been configured to do so
for url in configs:
if self.recursion > 0:
# Attempt to acquire the schema at the very least to allow
# our configuration based urls.
schema = GET_SCHEMA_RE.match(url)
if schema is None:
# Plan B is to assume we're dealing with a file
schema = 'file'
if not os.path.isabs(url):
# We're dealing with a relative path; prepend
# our current config path
url = os.path.join(self.config_path, url)
url = '{}://{}'.format(schema, URLBase.quote(url))
else:
# Ensure our schema is always in lower case
schema = schema.group('schema').lower()
# Some basic validation
if schema not in SCHEMA_MAP:
ConfigBase.logger.warning(
'Unsupported include schema {}.'.format(schema))
continue
# Parse our url details of the server object as dictionary
# containing all of the information parsed from our URL
results = SCHEMA_MAP[schema].parse_url(url)
if not results:
# Failed to parse the server URL
self.logger.warning(
'Unparseable include URL {}'.format(url))
continue
# Handle cross inclusion based on allow_cross_includes rules
if (SCHEMA_MAP[schema].allow_cross_includes ==
ConfigIncludeMode.STRICT
and schema not in self.schemas()
and not self.insecure_includes) or \
SCHEMA_MAP[schema].allow_cross_includes == \
ConfigIncludeMode.NEVER:
# Prevent the loading if insecure base protocols
ConfigBase.logger.warning(
'Including {}:// based configuration is prohibited. '
'Ignoring URL {}'.format(schema, url))
continue
# Prepare our Asset Object
results['asset'] = asset
# No cache is required because we're just lumping this in
# and associating it with the cache value we've already
# declared (prior to our recursion)
results['cache'] = False
# Recursion can never be parsed from the URL; we decrement
# it one level
results['recursion'] = self.recursion - 1
# Insecure Includes flag can never be parsed from the URL
results['insecure_includes'] = self.insecure_includes
try:
# Attempt to create an instance of our plugin using the
# parsed URL information
cfg_plugin = SCHEMA_MAP[results['schema']](**results)
except Exception as e:
# the arguments are invalid or can not be used.
self.logger.warning(
'Could not load include URL: {}'.format(url))
self.logger.debug('Loading Exception: {}'.format(str(e)))
continue
# if we reach here, we can now add this servers found
# in this configuration file to our list
self._cached_servers.extend(
cfg_plugin.servers(asset=asset))
# We no longer need our configuration object
del cfg_plugin
else:
self.logger.debug(
'Recursion limit reached; ignoring Include URL: %s' % url)
if self._cached_servers:
self.logger.info('Loaded {} entries from {}'.format(
len(self._cached_servers), self.url()))
else:
self.logger.warning('Failed to load configuration from {}'.format(
self.url()))
self.logger.warning(
'Failed to load Apprise configuration from {}'.format(
self.url()))
# Set the time our content was cached at
self._cached_time = time.time()
@ -285,7 +416,8 @@ class ConfigBase(URLBase):
except TypeError:
# content was not expected string type
ConfigBase.logger.error('Invalid apprise config specified')
ConfigBase.logger.error(
'Invalid Apprise configuration specified.')
return None
# By default set our return value to None since we don't know
@ -300,7 +432,7 @@ class ConfigBase(URLBase):
if not result:
# Invalid syntax
ConfigBase.logger.error(
'Undetectable apprise configuration found '
'Undetectable Apprise configuration found '
'based on line {}.'.format(line))
# Take an early exit
return None
@ -341,14 +473,14 @@ class ConfigBase(URLBase):
if not config_format:
# We couldn't detect configuration
ConfigBase.logger.error('Could not detect configuration')
return list()
return (list(), list())
if config_format not in CONFIG_FORMATS:
# Invalid configuration type specified
ConfigBase.logger.error(
'An invalid configuration format ({}) was specified'.format(
config_format))
return list()
return (list(), list())
# Dynamically load our parse_ function based on our config format
fn = getattr(ConfigBase, 'config_parse_{}'.format(config_format))
@ -360,9 +492,14 @@ class ConfigBase(URLBase):
def config_parse_text(content, asset=None):
"""
Parse the specified content as though it were a simple text file only
containing a list of URLs. Return a list of loaded notification plugins
containing a list of URLs.
Optionally associate an asset with the notification.
Return a tuple that looks like (servers, configs) where:
- servers contains a list of loaded notification plugins
- configs contains a list of additional configuration files
referenced.
You may also optionally associate an asset with the notification.
The file syntax is:
@ -376,14 +513,25 @@ class ConfigBase(URLBase):
# Or you can use this format (no tags associated)
<URL>
# you can also use the keyword 'include' and identify a
# configuration location (like this file) which will be included
# as additional configuration entries when loaded.
include <ConfigURL>
"""
response = list()
# A list of loaded Notification Services
servers = list()
# A list of additional configuration files referenced using
# the include keyword
configs = list()
# Define what a valid line should look like
valid_line_re = re.compile(
r'^\s*(?P<line>([;#]+(?P<comment>.*))|'
r'(\s*(?P<tags>[^=]+)=|=)?\s*'
r'(?P<url>[a-z0-9]{2,9}://.*))?$', re.I)
r'(?P<url>[a-z0-9]{2,9}://.*)|'
r'include\s+(?P<config>.+))?\s*$', re.I)
try:
# split our content up to read line by line
@ -391,28 +539,35 @@ class ConfigBase(URLBase):
except TypeError:
# content was not expected string type
ConfigBase.logger.error('Invalid apprise text data specified')
return list()
ConfigBase.logger.error(
'Invalid Apprise TEXT based configuration specified.')
return (list(), list())
for line, entry in enumerate(content, start=1):
result = valid_line_re.match(entry)
if not result:
# Invalid syntax
ConfigBase.logger.error(
'Invalid apprise text format found '
'Invalid Apprise TEXT configuration format found '
'{} on line {}.'.format(entry, line))
# Assume this is a file we shouldn't be parsing. It's owner
# can read the error printed to screen and take action
# otherwise.
return list()
return (list(), list())
# Store our url read in
url = result.group('url')
if not url:
url, config = result.group('url'), result.group('config')
if not (url or config):
# Comment/empty line; do nothing
continue
if config:
ConfigBase.logger.debug('Include URL: {}'.format(config))
# Store our include line
configs.append(config.strip())
continue
# Acquire our url tokens
results = plugins.url_to_dict(url)
if results is None:
@ -446,23 +601,32 @@ class ConfigBase(URLBase):
continue
# if we reach here, we successfully loaded our data
response.append(plugin)
servers.append(plugin)
# Return what was loaded
return response
return (servers, configs)
@staticmethod
def config_parse_yaml(content, asset=None):
"""
Parse the specified content as though it were a yaml file
specifically formatted for apprise. Return a list of loaded
notification plugins.
specifically formatted for Apprise.
Optionally associate an asset with the notification.
Return a tuple that looks like (servers, configs) where:
- servers contains a list of loaded notification plugins
- configs contains a list of additional configuration files
referenced.
You may optionally associate an asset with the notification.
"""
response = list()
# A list of loaded Notification Services
servers = list()
# A list of additional configuration files referenced using
# the include keyword
configs = list()
try:
# Load our data (safely)
@ -471,23 +635,24 @@ class ConfigBase(URLBase):
except (AttributeError, yaml.error.MarkedYAMLError) as e:
# Invalid content
ConfigBase.logger.error(
'Invalid apprise yaml data specified.')
'Invalid Apprise YAML data specified.')
ConfigBase.logger.debug(
'YAML Exception:{}{}'.format(os.linesep, e))
return list()
return (list(), list())
if not isinstance(result, dict):
# Invalid content
ConfigBase.logger.error('Invalid apprise yaml structure specified')
return list()
ConfigBase.logger.error(
'Invalid Apprise YAML based configuration specified.')
return (list(), list())
# YAML Version
version = result.get('version', 1)
if version != 1:
# Invalid syntax
ConfigBase.logger.error(
'Invalid apprise yaml version specified {}.'.format(version))
return list()
'Invalid Apprise YAML version specified {}.'.format(version))
return (list(), list())
#
# global asset object
@ -534,15 +699,36 @@ class ConfigBase(URLBase):
# Store any preset tags
global_tags = set(parse_list(tags))
#
# include root directive
#
includes = result.get('include', None)
if isinstance(includes, six.string_types):
# Support a single inline string
includes = list([includes])
elif not isinstance(includes, (list, tuple)):
# Not a problem; we simply have no includes
includes = list()
# Iterate over each config URL
for no, url in enumerate(includes):
if isinstance(url, six.string_types):
# We're just a simple URL string...
configs.append(url)
elif isinstance(url, dict):
# Store the url and ignore arguments associated
configs.extend(u for u in url.keys())
#
# urls root directive
#
urls = result.get('urls', None)
if not isinstance(urls, (list, tuple)):
# Unsupported
ConfigBase.logger.error(
'Missing "urls" directive in apprise yaml.')
return list()
# Not a problem; we simply have no urls
urls = list()
# Iterate over each URL
for no, url in enumerate(urls):
@ -654,7 +840,7 @@ class ConfigBase(URLBase):
else:
# Unsupported
ConfigBase.logger.warning(
'Unsupported apprise yaml entry #{}'.format(no + 1))
'Unsupported Apprise YAML entry #{}'.format(no + 1))
continue
# Track our entries
@ -667,7 +853,7 @@ class ConfigBase(URLBase):
# Grab our first item
_results = results.pop(0)
# tag is a special keyword that is managed by apprise object.
# tag is a special keyword that is managed by Apprise object.
# The below ensures our tags are set correctly
if 'tag' in _results:
# Tidy our list up
@ -696,17 +882,19 @@ class ConfigBase(URLBase):
ConfigBase.logger.debug(
'Loaded URL: {}'.format(plugin.url()))
except Exception:
except Exception as e:
# the arguments are invalid or can not be used.
ConfigBase.logger.warning(
'Could not load apprise yaml entry #{}, item #{}'
'Could not load Apprise YAML configuration '
'entry #{}, item #{}'
.format(no + 1, entry))
ConfigBase.logger.debug('Loading Exception: %s' % str(e))
continue
# if we reach here, we successfully loaded our data
response.append(plugin)
servers.append(plugin)
return response
return (servers, configs)
def pop(self, index=-1):
"""

View File

@ -28,6 +28,7 @@ import io
import os
from .ConfigBase import ConfigBase
from ..common import ConfigFormat
from ..common import ConfigIncludeMode
from ..AppriseLocale import gettext_lazy as _
@ -42,6 +43,9 @@ class ConfigFile(ConfigBase):
# The default protocol
protocol = 'file'
# Configuration file inclusion can only be of the same type
allow_cross_includes = ConfigIncludeMode.STRICT
def __init__(self, path, **kwargs):
"""
Initialize File Object
@ -53,7 +57,10 @@ class ConfigFile(ConfigBase):
super(ConfigFile, self).__init__(**kwargs)
# Store our file path as it was set
self.path = os.path.expanduser(path)
self.path = os.path.abspath(os.path.expanduser(path))
# Update the config path to be relative to our file we just loaded
self.config_path = os.path.dirname(self.path)
return
@ -91,10 +98,9 @@ class ConfigFile(ConfigBase):
response = None
path = os.path.expanduser(self.path)
try:
if self.max_buffer_size > 0 and \
os.path.getsize(path) > self.max_buffer_size:
os.path.getsize(self.path) > self.max_buffer_size:
# Content exceeds maximum buffer size
self.logger.error(
@ -106,7 +112,7 @@ class ConfigFile(ConfigBase):
# getsize() can throw this acception if the file is missing
# and or simply isn't accessible
self.logger.error(
'File is not accessible: {}'.format(path))
'File is not accessible: {}'.format(self.path))
return None
# Always call throttle before any server i/o is made
@ -115,7 +121,7 @@ class ConfigFile(ConfigBase):
try:
# Python 3 just supports open(), however to remain compatible with
# Python 2, we use the io module
with io.open(path, "rt", encoding=self.encoding) as f:
with io.open(self.path, "rt", encoding=self.encoding) as f:
# Store our content for parsing
response = f.read()
@ -126,7 +132,7 @@ class ConfigFile(ConfigBase):
self.logger.error(
'File not using expected encoding ({}) : {}'.format(
self.encoding, path))
self.encoding, self.path))
return None
except (IOError, OSError):
@ -136,13 +142,13 @@ class ConfigFile(ConfigBase):
# Could not open and/or read the file; this is not a problem since
# we scan a lot of default paths.
self.logger.error(
'File can not be opened for read: {}'.format(path))
'File can not be opened for read: {}'.format(self.path))
return None
# Detect config format based on file extension if it isn't already
# enforced
if self.config_format is None and \
re.match(r'^.*\.ya?ml\s*$', path, re.I) is not None:
re.match(r'^.*\.ya?ml\s*$', self.path, re.I) is not None:
# YAML Filename Detected
self.default_config_format = ConfigFormat.YAML
@ -163,7 +169,7 @@ class ConfigFile(ConfigBase):
# We're done early; it's not a good URL
return results
match = re.match(r'file://(?P<path>[^?]+)(\?.*)?', url, re.I)
match = re.match(r'[a-z0-9]+://(?P<path>[^?]+)(\?.*)?', url, re.I)
if not match:
return None

View File

@ -28,6 +28,7 @@ import six
import requests
from .ConfigBase import ConfigBase
from ..common import ConfigFormat
from ..common import ConfigIncludeMode
from ..URLBase import PrivacyMode
from ..AppriseLocale import gettext_lazy as _
@ -64,6 +65,9 @@ class ConfigHTTP(ConfigBase):
# from queries to services that may be untrusted.
max_error_buffer_size = 2048
# Configuration file inclusion can always include this type
allow_cross_includes = ConfigIncludeMode.ALWAYS
def __init__(self, headers=None, **kwargs):
"""
Initialize HTTP Object

View File

@ -23,12 +23,12 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import six
import re
import six
from os import listdir
from os.path import dirname
from os.path import abspath
from ..logger import logger
# Maintains a mapping of all of the configuration services
SCHEMA_MAP = {}
@ -88,29 +88,39 @@ def __load_matrix(path=abspath(dirname(__file__)), name='apprise.config'):
# not the module:
globals()[plugin_name] = plugin
# Load protocol(s) if defined
proto = getattr(plugin, 'protocol', None)
if isinstance(proto, six.string_types):
if proto not in SCHEMA_MAP:
SCHEMA_MAP[proto] = plugin
fn = getattr(plugin, 'schemas', None)
try:
schemas = set([]) if not callable(fn) else fn(plugin)
elif isinstance(proto, (set, list, tuple)):
# Support iterables list types
for p in proto:
if p not in SCHEMA_MAP:
SCHEMA_MAP[p] = plugin
except TypeError:
# Python v2.x support where functions associated with classes
# were considered bound to them and could not be called prior
# to the classes initialization. This code can be dropped
# once Python v2.x support is dropped. The below code introduces
# replication as it already exists and is tested in
# URLBase.schemas()
schemas = set([])
for key in ('protocol', 'secure_protocol'):
schema = getattr(plugin, key, None)
if isinstance(schema, six.string_types):
schemas.add(schema)
# Load secure protocol(s) if defined
protos = getattr(plugin, 'secure_protocol', None)
if isinstance(protos, six.string_types):
if protos not in SCHEMA_MAP:
SCHEMA_MAP[protos] = plugin
elif isinstance(schema, (set, list, tuple)):
# Support iterables list types
for s in schema:
if isinstance(s, six.string_types):
schemas.add(s)
if isinstance(protos, (set, list, tuple)):
# Support iterables list types
for p in protos:
if p not in SCHEMA_MAP:
SCHEMA_MAP[p] = plugin
# map our schema to our plugin
for schema in schemas:
if schema in SCHEMA_MAP:
logger.error(
"Config schema ({}) mismatch detected - {} to {}"
.format(schema, SCHEMA_MAP[schema], plugin))
continue
# Assign plugin
SCHEMA_MAP[schema] = plugin
return SCHEMA_MAP

View File

@ -128,29 +128,39 @@ def __load_matrix(path=abspath(dirname(__file__)), name='apprise.plugins'):
# Load our module into memory so it's accessible to all
globals()[plugin_name] = plugin
# Load protocol(s) if defined
proto = getattr(plugin, 'protocol', None)
if isinstance(proto, six.string_types):
if proto not in SCHEMA_MAP:
SCHEMA_MAP[proto] = plugin
fn = getattr(plugin, 'schemas', None)
try:
schemas = set([]) if not callable(fn) else fn(plugin)
elif isinstance(proto, (set, list, tuple)):
# Support iterables list types
for p in proto:
if p not in SCHEMA_MAP:
SCHEMA_MAP[p] = plugin
except TypeError:
# Python v2.x support where functions associated with classes
# were considered bound to them and could not be called prior
# to the classes initialization. This code can be dropped
# once Python v2.x support is dropped. The below code introduces
# replication as it already exists and is tested in
# URLBase.schemas()
schemas = set([])
for key in ('protocol', 'secure_protocol'):
schema = getattr(plugin, key, None)
if isinstance(schema, six.string_types):
schemas.add(schema)
# Load secure protocol(s) if defined
protos = getattr(plugin, 'secure_protocol', None)
if isinstance(protos, six.string_types):
if protos not in SCHEMA_MAP:
SCHEMA_MAP[protos] = plugin
elif isinstance(schema, (set, list, tuple)):
# Support iterables list types
for s in schema:
if isinstance(s, six.string_types):
schemas.add(s)
if isinstance(protos, (set, list, tuple)):
# Support iterables list types
for p in protos:
if p not in SCHEMA_MAP:
SCHEMA_MAP[p] = plugin
# map our schema to our plugin
for schema in schemas:
if schema in SCHEMA_MAP:
logger.error(
"Notification schema ({}) mismatch detected - {} to {}"
.format(schema, SCHEMA_MAP[schema], plugin))
continue
# Assign plugin
SCHEMA_MAP[schema] = plugin
return SCHEMA_MAP

View File

@ -73,6 +73,10 @@ The more of these you specify, the more verbose the output is\.
Send notifications synchronously (one after the other) instead of all at once\.
.
.TP
\fB\-R\fR, \fB\-\-recursion\-depth\fR
he number of recursive import entries that can be loaded from within Apprise configuration\. By default this is set to 1\. If this is set to zero, then import statements found in any configuration is ignored\.
.
.TP
\fB\-D\fR, \fB\-\-debug\fR
A debug mode; useful for troubleshooting\.
.
@ -85,10 +89,10 @@ Display the apprise version and exit\.
Show this message and exit\.
.
.SH "EXIT STATUS"
\fBapprise\fR exits with a status 0 if all notifications were sent successfully otherwise \fBapprise\fR returns a value of 1\.
\fBapprise\fR exits with a status 0 if all notifications were sent successfully otherwise \fBapprise\fR returns a value of 1\. \fBapprise\fR returns a value of 2 if there was an error specified on the command line (such as not providing an valid argument)\.
.
.P
\fBapprise\fR exits with a status of 2 if there were no notifcations sent due (as a result of end user actions)\. This occurs in the case where you have assigned one or more tags to all of the Apprise URLs being notified and did not match any when actually executing the \fBapprise\fR tool\. This can also occur if you specified a tag that has not been assigned to anything defined in your configuration\.
\fBapprise\fR exits with a status of 3 if there were no notifcations sent due (as a result of end user actions)\. This occurs in the case where you have assigned one or more tags to all of the Apprise URLs being notified and did not match any when actually executing the \fBapprise\fR tool\. This can also occur if you specified a tag that has not been assigned to anything defined in your configuration\.
.
.SH "SERVICE URLS"
There are to many service URL and combinations to list here\. It\'s best to visit the Apprise GitHub page \fIhttps://github\.com/caronc/apprise/wiki#notification\-services\fR and see what\'s available\.
@ -100,7 +104,7 @@ Send a notification to as many servers as you want to specify as you can easily
.
.nf
$ apprise \-t \'my title\' \-b \'my notification body\' \e
$ apprise \-vv \-t \'my title\' \-b \'my notification body\' \e
\'mailto://myemail:mypass@gmail\.com\' \e
\'pbul://o\.gn5kj6nfhv736I7jC3cj3QLRiyhgl98b\'
.
@ -115,7 +119,7 @@ If you don\'t specify a \fB\-\-body\fR (\fB\-b\fR) then stdin is used allowing y
.
.nf
$ cat /proc/cpuinfo | apprise \-t \'cpu info\' \e
$ cat /proc/cpuinfo | apprise \-vv \-t \'cpu info\' \e
\'mailto://myemail:mypass@gmail\.com\'
.
.fi
@ -129,7 +133,7 @@ Load in a configuration file which identifies all of your notification service U
.
.nf
$ apprise \-t \'my title\' \-b \'my notification body\' \e
$ apprise \-vv \-t \'my title\' \-b \'my notification body\' \e
\-\-config=~/apprise\.yml
.
.fi
@ -143,7 +147,7 @@ Load in a configuration file from a remote server that identifies all of your no
.
.nf
$ apprise \-t \'my title\' \-b \'my notification body\' \e
$ apprise \-vv \-t \'my title\' \-b \'my notification body\' \e
\-\-config=https://localhost/my/apprise/config \e
\-t devops
.
@ -158,7 +162,7 @@ Include an attachment:
.
.nf
$ apprise \-t \'School Assignment\' \-b \'See attached\' \e
$ apprise \-vv \-t \'School Assignment\' \-b \'See attached\' \e
\-\-attach=Documents/FinalReport\.docx
.
.fi

View File

@ -60,6 +60,11 @@ The Apprise options are as follows:
Send notifications synchronously (one after the other) instead of
all at once.
* `-R`, `--recursion-depth`:
he number of recursive import entries that can be loaded from within
Apprise configuration. By default this is set to 1. If this is set to
zero, then import statements found in any configuration is ignored.
* `-D`, `--debug`:
A debug mode; useful for troubleshooting.
@ -71,9 +76,11 @@ The Apprise options are as follows:
## EXIT STATUS
**apprise** exits with a status 0 if all notifications were sent successfully otherwise **apprise** returns a value of 1.
**apprise** exits with a status 0 if all notifications were sent successfully otherwise **apprise** returns a value of 1. **apprise** returns a value of 2 if
there was an error specified on the command line (such as not providing an valid
argument).
**apprise** exits with a status of 2 if there were no notifcations sent due (as a result of end user actions). This occurs in the case where you have assigned one or more tags to all of the Apprise URLs being notified and did not match any when actually executing the **apprise** tool. This can also occur if you specified a tag that has not been assigned to anything defined in your configuration.
**apprise** exits with a status of 3 if there were no notifcations sent due (as a result of end user actions). This occurs in the case where you have assigned one or more tags to all of the Apprise URLs being notified and did not match any when actually executing the **apprise** tool. This can also occur if you specified a tag that has not been assigned to anything defined in your configuration.
## SERVICE URLS
@ -88,32 +95,32 @@ visit the [Apprise GitHub page][serviceurls] and see what's available.
Send a notification to as many servers as you want to specify as you can
easily chain them together:
$ apprise -t 'my title' -b 'my notification body' \
$ apprise -vv -t 'my title' -b 'my notification body' \
'mailto://myemail:mypass@gmail.com' \
'pbul://o.gn5kj6nfhv736I7jC3cj3QLRiyhgl98b'
If you don't specify a **--body** (**-b**) then stdin is used allowing you to
use the tool as part of your every day administration:
$ cat /proc/cpuinfo | apprise -t 'cpu info' \
$ cat /proc/cpuinfo | apprise -vv -t 'cpu info' \
'mailto://myemail:mypass@gmail.com'
Load in a configuration file which identifies all of your notification service
URLs and notify them all:
$ apprise -t 'my title' -b 'my notification body' \
$ apprise -vv -t 'my title' -b 'my notification body' \
--config=~/apprise.yml
Load in a configuration file from a remote server that identifies all of your
notification service URLs and only notify the ones tagged as _devops_.
$ apprise -t 'my title' -b 'my notification body' \
$ apprise -vv -t 'my title' -b 'my notification body' \
--config=https://localhost/my/apprise/config \
-t devops
Include an attachment:
$ apprise -t 'School Assignment' -b 'See attached' \
$ apprise -vv -t 'School Assignment' -b 'See attached' \
--attach=Documents/FinalReport.docx
## BUGS

View File

@ -168,6 +168,11 @@ def test_apprise():
# Support URL
return ''
@staticmethod
def parse_url(url, *args, **kwargs):
# always parseable
return NotifyBase.parse_url(url, verify_host=False)
class GoodNotification(NotifyBase):
def __init__(self, **kwargs):
super(GoodNotification, self).__init__(
@ -181,6 +186,11 @@ def test_apprise():
# Pretend everything is okay
return True
@staticmethod
def parse_url(url, *args, **kwargs):
# always parseable
return NotifyBase.parse_url(url, verify_host=False)
# Store our bad notification in our schema map
SCHEMA_MAP['bad'] = BadNotification
@ -588,9 +598,71 @@ def test_apprise_tagging(mock_post, mock_get):
tag=[(object, ), ]) is None
@pytest.mark.skipif(sys.version_info.major <= 2, reason="Requires Python 3.x+")
def test_apprise_schemas(tmpdir):
"""
API: Apprise().schema() tests
"""
# Caling load matix a second time which is an internal function causes it
# to skip over content already loaded into our matrix and thefore accesses
# other if/else parts of the code that aren't otherwise called
__load_matrix()
a = Apprise()
# no items
assert len(a) == 0
class TextNotification(NotifyBase):
# set our default notification format
notify_format = NotifyFormat.TEXT
# Garbage Protocol Entries
protocol = None
secure_protocol = (None, object)
class HtmlNotification(NotifyBase):
protocol = ('html', 'htm')
secure_protocol = ('htmls', 'htms')
class MarkDownNotification(NotifyBase):
protocol = 'markdown'
secure_protocol = 'markdowns'
# Store our notifications into our schema map
SCHEMA_MAP['text'] = TextNotification
SCHEMA_MAP['html'] = HtmlNotification
SCHEMA_MAP['markdown'] = MarkDownNotification
schemas = URLBase.schemas(TextNotification)
assert isinstance(schemas, set) is True
# We didn't define a protocol or secure protocol
assert len(schemas) == 0
schemas = URLBase.schemas(HtmlNotification)
assert isinstance(schemas, set) is True
assert len(schemas) == 4
assert 'html' in schemas
assert 'htm' in schemas
assert 'htmls' in schemas
assert 'htms' in schemas
# Invalid entries do not disrupt schema calls
for garbage in (object(), None, 42):
schemas = URLBase.schemas(garbage)
assert isinstance(schemas, set) is True
assert len(schemas) == 0
def test_apprise_notify_formats(tmpdir):
"""
API: Apprise() TextFormat tests
API: Apprise() Input Formats tests
"""
# Caling load matix a second time which is an internal function causes it
@ -1371,10 +1443,15 @@ class NotifyGoober(NotifyBase):
# trying to over-ride items previously used
# The default simple (insecure) protocol (used by NotifyMail)
protocol = 'mailto'
protocol = ('mailto', 'goober')
# The default secure protocol (used by NotifyMail)
secure_protocol = 'mailtos'""")
secure_protocol = 'mailtos'
@staticmethod
def parse_url(url, *args, **kwargs):
# always parseable
return ConfigBase.parse_url(url, verify_host=False)""")
# Utilizes a schema:// already occupied (as tuple)
base.join('NotifyBugger.py').write("""
@ -1388,6 +1465,11 @@ class NotifyBugger(NotifyBase):
protocol = ('mailto', 'bugger-test' )
# The default secure protocol (used by NotifyMail), the other isn't
secure_protocol = ('mailtos', 'bugger-tests')""")
secure_protocol = ('mailtos', ['garbage'])
@staticmethod
def parse_url(url, *args, **kwargs):
# always parseable
return ConfigBase.parse_url(url, verify_host=False)""")
__load_matrix(path=str(base), name=module_name)

View File

@ -23,11 +23,13 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys
import six
import io
import mock
import pytest
from apprise import NotifyFormat
from apprise import ConfigIncludeMode
from apprise.Apprise import Apprise
from apprise.AppriseConfig import AppriseConfig
from apprise.AppriseAsset import AppriseAsset
@ -379,12 +381,20 @@ def test_apprise_config_instantiate():
'invalid://?', suppress_exceptions=True) is None
class BadConfig(ConfigBase):
# always allow incusion
allow_cross_includes = ConfigIncludeMode.ALWAYS
def __init__(self, **kwargs):
super(BadConfig, self).__init__(**kwargs)
# We fail whenever we're initialized
raise TypeError()
@staticmethod
def parse_url(url, *args, **kwargs):
# always parseable
return ConfigBase.parse_url(url, verify_host=False)
# Store our bad configuration in our schema map
CONFIG_SCHEMA_MAP['bad'] = BadConfig
@ -397,9 +407,67 @@ def test_apprise_config_instantiate():
'bad://path', suppress_exceptions=True) is None
def test_invalid_apprise_config(tmpdir):
"""
Parse invalid configuration includes
"""
class BadConfig(ConfigBase):
# always allow incusion
allow_cross_includes = ConfigIncludeMode.ALWAYS
def __init__(self, **kwargs):
super(BadConfig, self).__init__(**kwargs)
# We intentionally fail whenever we're initialized
raise TypeError()
@staticmethod
def parse_url(url, *args, **kwargs):
# always parseable
return ConfigBase.parse_url(url, verify_host=False)
# Store our bad configuration in our schema map
CONFIG_SCHEMA_MAP['bad'] = BadConfig
# temporary file to work with
t = tmpdir.mkdir("apprise-bad-obj").join("invalid")
buf = """
# Include an invalid schema
include invalid://
# An unparsable valid schema
include https://
# A valid configuration that will throw an exception
include bad://
# Include ourselves (So our recursive includes fails as well)
include {}
""".format(str(t))
t.write(buf)
# Create ourselves a config object with caching disbled
ac = AppriseConfig(recursion=2, insecure_includes=True, cache=False)
# Nothing loaded yet
assert len(ac) == 0
# Add our config
assert ac.add(configs=str(t), asset=AppriseAsset()) is True
# One configuration file
assert len(ac) == 1
# All of the servers were invalid and would not load
assert len(ac.servers()) == 0
def test_apprise_config_with_apprise_obj(tmpdir):
"""
API: ConfigBase.parse_inaccessible_text_file
API: ConfigBase - parse valid config
"""
@ -428,7 +496,7 @@ def test_apprise_config_with_apprise_obj(tmpdir):
# Store our good notification in our schema map
NOTIFY_SCHEMA_MAP['good'] = GoodNotification
# Create ourselves a config object with caching disbled
# Create ourselves a config object
ac = AppriseConfig(cache=False)
# Nothing loaded yet
@ -578,6 +646,197 @@ def test_apprise_config_with_apprise_obj(tmpdir):
assert isinstance(a.pop(len(a) - 1), NotifyBase) is True
def test_recursive_config_inclusion(tmpdir):
"""
API: Apprise() Recursive Config Inclusion
"""
# To test our config classes, we make three dummy configs
class ConfigCrossPostAlways(ConfigFile):
"""
A dummy config that is set to always allow inclusion
"""
service_name = 'always'
# protocol
protocol = 'always'
# Always type
allow_cross_includes = ConfigIncludeMode.ALWAYS
class ConfigCrossPostStrict(ConfigFile):
"""
A dummy config that is set to strict inclusion
"""
service_name = 'strict'
# protocol
protocol = 'strict'
# Always type
allow_cross_includes = ConfigIncludeMode.STRICT
class ConfigCrossPostNever(ConfigFile):
"""
A dummy config that is set to never allow inclusion
"""
service_name = 'never'
# protocol
protocol = 'never'
# Always type
allow_cross_includes = ConfigIncludeMode.NEVER
# store our entries
CONFIG_SCHEMA_MAP['never'] = ConfigCrossPostNever
CONFIG_SCHEMA_MAP['strict'] = ConfigCrossPostStrict
CONFIG_SCHEMA_MAP['always'] = ConfigCrossPostAlways
# Make our new path valid
suite = tmpdir.mkdir("apprise_config_recursion")
cfg01 = suite.join("cfg01.cfg")
cfg02 = suite.mkdir("dir1").join("cfg02.cfg")
cfg03 = suite.mkdir("dir2").join("cfg03.cfg")
cfg04 = suite.mkdir("dir3").join("cfg04.cfg")
# Populate our files with valid configuration include lines
cfg01.write("""
# json entry
json://localhost:8080
# absolute path inclusion to ourselves
include {}""".format(str(cfg01)))
cfg02.write("""
# syslog entry
syslog://
# recursively include ourselves
include cfg02.cfg""")
cfg03.write("""
# xml entry
xml://localhost:8080
# relative path inclusion
include ../dir1/cfg02.cfg
# test that we can't include invalid entries
include invalid://entry
# Include non includable type
include memory://""")
cfg04.write("""
# xml entry
xml://localhost:8080
# always include of our file
include always://{}
# never include of our file
include never://{}
# strict include of our file
include strict://{}""".format(str(cfg04), str(cfg04), str(cfg04)))
# Create ourselves a config object
ac = AppriseConfig()
# There are no servers loaded
assert len(ac) == 0
# load our configuration
assert ac.add(configs=str(cfg01)) is True
# verify it loaded
assert len(ac) == 1
# 1 service will be loaded as there is no recursion at this point
assert len(ac.servers()) == 1
# Create ourselves a config object
ac = AppriseConfig(recursion=1)
# load our configuration
assert ac.add(configs=str(cfg01)) is True
# verify one configuration file loaded however since it recursively
# loaded itself 1 more time, it still doesn't impact the load count:
assert len(ac) == 1
# 2 services loaded now that we loaded the same file twice
assert len(ac.servers()) == 2
#
# Now we test relative file inclusion
#
# Create ourselves a config object
ac = AppriseConfig(recursion=10)
# There are no servers loaded
assert len(ac) == 0
# load our configuration
assert ac.add(configs=str(cfg02)) is True
# verify it loaded
assert len(ac) == 1
# 11 services loaded because we reloaded ourselves 10 times
# after loading the first entry
assert len(ac.servers()) == 11
# Test our include modes (strict, always, and never)
# Create ourselves a config object
ac = AppriseConfig(recursion=1)
# There are no servers loaded
assert len(ac) == 0
# load our configuration
assert ac.add(configs=str(cfg04)) is True
# verify it loaded
assert len(ac) == 1
# 2 servers loaded
# 1 - from the file read (which is set at mode STRICT
# 1 - from the always://
#
# The never:// can ever be includeed, and the strict:// is ot of type
# file:// (the one doing the include) so it is also ignored.
#
# By turning on the insecure_includes, we can include the strict files too
assert len(ac.servers()) == 2
# Create ourselves a config object
ac = AppriseConfig(recursion=1, insecure_includes=True)
# There are no servers loaded
assert len(ac) == 0
# load our configuration
assert ac.add(configs=str(cfg04)) is True
# verify it loaded
assert len(ac) == 1
# 3 servers loaded
# 1 - from the file read (which is set at mode STRICT
# 1 - from the always://
# 1 - from the strict:// (due to insecure_includes set)
assert len(ac.servers()) == 3
def test_apprise_config_matrix_load():
"""
API: AppriseConfig() matrix initialization
@ -649,6 +908,81 @@ def test_apprise_config_matrix_load():
__load_matrix()
def test_configmatrix_dynamic_importing(tmpdir):
"""
API: Apprise() Config Matrix Importing
"""
# Make our new path valid
suite = tmpdir.mkdir("apprise_config_test_suite")
suite.join("__init__.py").write('')
module_name = 'badconfig'
# Update our path to point to our new test suite
sys.path.insert(0, str(suite))
# Create a base area to work within
base = suite.mkdir(module_name)
base.join("__init__.py").write('')
# Test no app_id
base.join('ConfigBadFile1.py').write(
"""
class ConfigBadFile1(object):
pass""")
# No class of the same name
base.join('ConfigBadFile2.py').write(
"""
class BadClassName(object):
pass""")
# Exception thrown
base.join('ConfigBadFile3.py').write("""raise ImportError()""")
# Utilizes a schema:// already occupied (as string)
base.join('ConfigGoober.py').write(
"""
from apprise.config import ConfigBase
class ConfigGoober(ConfigBase):
# This class tests the fact we have a new class name, but we're
# trying to over-ride items previously used
# The default simple (insecure) protocol (used by ConfigHTTP)
protocol = ('http', 'goober')
# The default secure protocol (used by ConfigHTTP)
secure_protocol = 'https'
@staticmethod
def parse_url(url, *args, **kwargs):
# always parseable
return ConfigBase.parse_url(url, verify_host=False)""")
# Utilizes a schema:// already occupied (as tuple)
base.join('ConfigBugger.py').write("""
from apprise.config import ConfigBase
class ConfigBugger(ConfigBase):
# This class tests the fact we have a new class name, but we're
# trying to over-ride items previously used
# The default simple (insecure) protocol (used by ConfigHTTP), the other
# isn't
protocol = ('http', 'bugger-test' )
# The default secure protocol (used by ConfigHTTP), the other isn't
secure_protocol = ('https', ['garbage'])
@staticmethod
def parse_url(url, *args, **kwargs):
# always parseable
return ConfigBase.parse_url(url, verify_host=False)""")
__load_matrix(path=str(base), name=module_name)
@mock.patch('os.path.getsize')
def test_config_base_parse_inaccessible_text_file(mock_getsize, tmpdir):
"""

View File

@ -180,19 +180,27 @@ def test_apprise_cli_nux_env(tmpdir):
# Write a simple text based configuration file
t = tmpdir.mkdir("apprise-obj").join("apprise")
buf = """
# Include ourselves
include {}
taga,tagb=good://localhost
tagc=good://nuxref.com
"""
""".format(str(t))
t.write(buf)
# This will read our configuration and not send any notices at all
# because we assigned tags to all of our urls and didn't identify
# a specific match below.
# 'include' reference in configuration file would have included the file a
# second time (since recursion default is 1).
result = runner.invoke(cli.main, [
'-b', 'test config',
'--config', str(t),
])
assert result.exit_code == 2
# Even when recursion take place, tags are all honored
# so 2 is returned because nothing was notified
assert result.exit_code == 3
# This will send out 1 notification because our tag matches
# one of the entries above
@ -204,6 +212,48 @@ def test_apprise_cli_nux_env(tmpdir):
])
assert result.exit_code == 0
# Test recursion
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'--config', str(t),
'--tag', 'tagc',
# Invalid entry specified for recursion
'-R', 'invalid',
])
assert result.exit_code == 2
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'--config', str(t),
'--tag', 'tagc',
# missing entry specified for recursion
'--recursive-depth',
])
assert result.exit_code == 2
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'--config', str(t),
'--tag', 'tagc',
# Disable recursion (thus inclusion will be ignored)
'-R', '0',
])
assert result.exit_code == 0
# Test recursion
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'--config', str(t),
'--tag', 'tagc',
# Recurse up to 5 times
'--recursion-depth', '5',
])
assert result.exit_code == 0
# This will send out 2 notifications because by specifying 2 tag
# entries, we 'or' them together:
# translation: has taga or tagb or tagd
@ -252,7 +302,9 @@ def test_apprise_cli_nux_env(tmpdir):
'--config', str(t),
'--notification-type', 'invalid',
])
assert result.exit_code == 1
# An error code of 2 is returned if invalid input is specified on the
# command line
assert result.exit_code == 2
# The notification type switch is case-insensitive
result = runner.invoke(cli.main, [
@ -277,7 +329,9 @@ def test_apprise_cli_nux_env(tmpdir):
'--config', str(t),
'--input-format', 'invalid',
])
assert result.exit_code == 1
# An error code of 2 is returned if invalid input is specified on the
# command line
assert result.exit_code == 2
# The formatting switch is not case sensitive
result = runner.invoke(cli.main, [
@ -309,7 +363,7 @@ def test_apprise_cli_nux_env(tmpdir):
'--config', str(t),
'--tag', 'mytag',
])
assert result.exit_code == 2
assert result.exit_code == 3
# Same command as the one identified above except we set the --dry-run
# flag. This causes our list of matched results to be printed only.
@ -321,7 +375,7 @@ def test_apprise_cli_nux_env(tmpdir):
'--tag', 'mytag',
'--dry-run'
])
assert result.exit_code == 2
assert result.exit_code == 3
# Here is a case where we get what was expected; we also attach a file
result = runner.invoke(cli.main, [

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
@ -23,12 +23,10 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys
import six
import pytest
from apprise.AppriseAsset import AppriseAsset
from apprise.config.ConfigBase import ConfigBase
from apprise.config import __load_matrix
from apprise import ConfigFormat
# Disable logging for a cleaner testing output
@ -103,9 +101,9 @@ def test_config_base_detect_config_format():
"""
# Garbage Handling
assert ConfigBase.detect_config_format(object()) is None
assert ConfigBase.detect_config_format(None) is None
assert ConfigBase.detect_config_format(12) is None
for garbage in (object(), None, 42):
# A response is always correctly returned
assert ConfigBase.detect_config_format(garbage) is None
# Empty files are valid
assert ConfigBase.detect_config_format('') is ConfigFormat.TEXT
@ -143,10 +141,15 @@ def test_config_base_config_parse():
"""
# Garbage Handling
assert isinstance(ConfigBase.config_parse(object()), list)
assert isinstance(ConfigBase.config_parse(None), list)
assert isinstance(ConfigBase.config_parse(''), list)
assert isinstance(ConfigBase.config_parse(12), list)
for garbage in (object(), None, 42):
# A response is always correctly returned
result = ConfigBase.config_parse(garbage)
# response is a tuple...
assert isinstance(result, tuple)
# containing 2 items (plugins, config)
assert len(result) == 2
# In the case of garbage in, we get garbage out; both lists are empty
assert result == (list(), list())
# Valid Text Configuration
result = ConfigBase.config_parse("""
@ -154,9 +157,16 @@ def test_config_base_config_parse():
mailto://userb:pass@gmail.com
""", asset=AppriseAsset())
# We expect to parse 1 entry from the above
assert isinstance(result, list)
assert len(result) == 1
assert len(result[0].tags) == 0
assert isinstance(result, tuple)
assert len(result) == 2
# The first element is the number of notification services processed
assert len(result[0]) == 1
# If we index into the item, we can check to see the tags associate
# with it
assert len(result[0][0].tags) == 0
# The second is the number of configuration include lines parsed
assert len(result[1]) == 0
# Valid Configuration
result = ConfigBase.config_parse("""
@ -174,11 +184,13 @@ urls:
""", asset=AppriseAsset())
# We expect to parse 3 entries from the above
assert isinstance(result, list)
assert len(result) == 3
assert len(result[0].tags) == 0
assert len(result[1].tags) == 0
assert len(result[2].tags) == 2
assert isinstance(result, tuple)
assert len(result) == 2
assert isinstance(result[0], list)
assert len(result[0]) == 3
assert len(result[0][0].tags) == 0
assert len(result[0][1].tags) == 0
assert len(result[0][2].tags) == 2
# Test case where we pass in a bad format
result = ConfigBase.config_parse("""
@ -187,10 +199,11 @@ urls:
""", config_format='invalid-format')
# This is not parseable despite the valid text
assert isinstance(result, list)
assert len(result) == 0
assert isinstance(result, tuple)
assert isinstance(result[0], list)
assert len(result[0]) == 0
result = ConfigBase.config_parse("""
result, _ = ConfigBase.config_parse("""
; A comment line over top of a URL
mailto://userb:pass@gmail.com
""", config_format=ConfigFormat.TEXT)
@ -207,12 +220,18 @@ def test_config_base_config_parse_text():
"""
# Garbage Handling
assert isinstance(ConfigBase.config_parse_text(object()), list)
assert isinstance(ConfigBase.config_parse_text(None), list)
assert isinstance(ConfigBase.config_parse_text(''), list)
for garbage in (object(), None, 42):
# A response is always correctly returned
result = ConfigBase.config_parse_text(garbage)
# response is a tuple...
assert isinstance(result, tuple)
# containing 2 items (plugins, config)
assert len(result) == 2
# In the case of garbage in, we get garbage out; both lists are empty
assert result == (list(), list())
# Valid Configuration
result = ConfigBase.config_parse_text("""
result, config = ConfigBase.config_parse_text("""
# A comment line over top of a URL
mailto://userb:pass@gmail.com
@ -225,10 +244,16 @@ def test_config_base_config_parse_text():
# A line with mulitiple tag assignments to it
taga,tagb=kde://
""", asset=AppriseAsset())
# An include statement to Apprise API with trailing spaces:
include http://localhost:8080/notify/apprise
# A relative include statement (with trailing spaces)
include apprise.cfg """, asset=AppriseAsset())
# We expect to parse 3 entries from the above
assert isinstance(result, list)
assert isinstance(config, list)
assert len(result) == 3
assert len(result[0].tags) == 0
@ -237,9 +262,13 @@ def test_config_base_config_parse_text():
assert 'taga' in result[-1].tags
assert 'tagb' in result[-1].tags
assert len(config) == 2
assert 'http://localhost:8080/notify/apprise' in config
assert 'apprise.cfg' in config
# Here is a similar result set however this one has an invalid line
# in it which invalidates the entire file
result = ConfigBase.config_parse_text("""
result, config = ConfigBase.config_parse_text("""
# A comment line over top of a URL
mailto://userc:pass@gmail.com
@ -249,12 +278,18 @@ def test_config_base_config_parse_text():
I am an invalid line that does not follow any of the Apprise file rules!
""")
# We expect to parse 0 entries from the above
# We expect to parse 0 entries from the above because the invalid line
# invalidates the entire configuration file. This is for security reasons;
# we don't want to point at files load content in them just because they
# resemble an Apprise configuration.
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
# More invalid data
result = ConfigBase.config_parse_text("""
result, config = ConfigBase.config_parse_text("""
# An invalid URL
invalid://user:pass@gmail.com
@ -266,30 +301,33 @@ def test_config_base_config_parse_text():
# Just 1 token provided
sns://T1JJ3T3L2/
# Even with the above invalid entries, we can still
# have valid include lines
include file:///etc/apprise.cfg
# An invalid include (nothing specified afterwards)
include
# An include of a config type we don't support
include invalid://
""")
# We expect to parse 0 entries from the above
assert isinstance(result, list)
assert len(result) == 0
# Here is an empty file
result = ConfigBase.config_parse_text('')
# We expect to parse 0 entries from the above
assert isinstance(result, list)
assert len(result) == 0
# There was 1 valid entry
assert len(config) == 0
# Test case where a comment is on it's own line with nothing else
result = ConfigBase.config_parse_text("#")
result, config = ConfigBase.config_parse_text("#")
# We expect to parse 0 entries from the above
assert isinstance(result, list)
assert len(result) == 0
# Test case of empty file
result = ConfigBase.config_parse_text("")
# We expect to parse 0 entries from the above
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
def test_config_base_config_parse_yaml():
@ -302,19 +340,28 @@ def test_config_base_config_parse_yaml():
asset = AppriseAsset()
# Garbage Handling
assert isinstance(ConfigBase.config_parse_yaml(object()), list)
assert isinstance(ConfigBase.config_parse_yaml(None), list)
assert isinstance(ConfigBase.config_parse_yaml(''), list)
for garbage in (object(), None, '', 42):
# A response is always correctly returned
result = ConfigBase.config_parse_yaml(garbage)
# response is a tuple...
assert isinstance(result, tuple)
# containing 2 items (plugins, config)
assert len(result) == 2
# In the case of garbage in, we get garbage out; both lists are empty
assert result == (list(), list())
# Invalid Version
result = ConfigBase.config_parse_yaml("version: 2a", asset=asset)
result, config = ConfigBase.config_parse_yaml("version: 2a", asset=asset)
# Invalid data gets us an empty result set
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
# Invalid Syntax (throws a ScannerError)
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# if no version is specified then version 1 is presumed
version: 1
@ -325,8 +372,11 @@ urls
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
# Missing url token
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# if no version is specified then version 1 is presumed
version: 1
@ -336,8 +386,11 @@ version: 1
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
# No urls defined
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# if no version is specified then version 1 is presumed
version: 1
@ -348,8 +401,11 @@ urls:
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
# Invalid url defined
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# if no version is specified then version 1 is presumed
version: 1
@ -361,8 +417,11 @@ urls: 43
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
# Invalid url/schema
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# if no version is specified then version 1 is presumed
version: 1
@ -375,8 +434,11 @@ urls:
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
# Invalid url/schema
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# if no version is specified then version 1 is presumed
version: 1
@ -390,8 +452,14 @@ urls:
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
# Invalid url/schema
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# Include entry with nothing associated with it
include:
urls:
- just some free text that isn't valid:
- a garbage entry to go with it
@ -402,8 +470,11 @@ urls:
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
# Invalid url/schema
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# if no version is specified then version 1 is presumed
version: 1
@ -416,19 +487,31 @@ urls:
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
# Invalid url/schema
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# no lists... just no
urls: [milk, pumpkin pie, eggs, juice]
# Including by list is okay
include: [file:///absolute/path/, relative/path, http://test.com]
""", asset=asset)
# Invalid data gets us an empty result set
assert isinstance(result, list)
assert len(result) == 0
# There were 3 include entries
assert len(config) == 3
assert 'file:///absolute/path/' in config
assert 'relative/path' in config
assert 'http://test.com' in config
# Invalid url/schema
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
urls:
# a very invalid sns entry
- sns://T1JJ3T3L2/
@ -437,7 +520,7 @@ urls:
- sns://T1JJ3T3L2/:
- invalid: test
# some strangness
# some strangeness
-
-
- test
@ -448,11 +531,30 @@ urls:
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
# Valid Configuration
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# if no version is specified then version 1 is presumed
version: 1
# Including by dict
include:
# File includes
- file:///absolute/path/
- relative/path
# Trailing colon shouldn't disrupt include
- http://test.com:
# invalid (numeric)
- 4
# some strangeness
-
-
- test
#
# Define your notification urls:
#
@ -469,8 +571,17 @@ urls:
assert len(result) == 3
assert len(result[0].tags) == 0
# There were 3 include entries
assert len(config) == 3
assert 'file:///absolute/path/' in config
assert 'relative/path' in config
assert 'http://test.com' in config
# Valid Configuration
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# A single line include is supported
include: http://localhost:8080/notify/apprise
urls:
- json://localhost:
- tag: my-custom-tag, my-other-tag
@ -504,8 +615,12 @@ urls:
assert len(result) == 5
assert len(result[0].tags) == 2
# Our single line included
assert len(config) == 1
assert 'http://localhost:8080/notify/apprise' in config
# Global Tags
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# Global Tags stacked as a list
tag:
- admin
@ -520,13 +635,16 @@ urls:
assert isinstance(result, list)
assert len(result) == 2
# There were no include entries defined
assert len(config) == 0
# all entries will have our global tags defined in them
for entry in result:
assert 'admin' in entry.tags
assert 'devops' in entry.tags
# Global Tags
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# Global Tags
tag: admin, devops
@ -559,8 +677,11 @@ urls:
assert len(result[1].tags) == 4
assert 'list-tag' in result[1].tags
# There were no include entries defined
assert len(config) == 0
# An invalid set of entries
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
urls:
# The following tags will get added to the global set
- json://localhost:
@ -573,11 +694,14 @@ urls:
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
# An asset we'll manipulate
asset = AppriseAsset()
# Global Tags
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# Test the creation of our apprise asset object
asset:
app_id: AppriseTest
@ -610,6 +734,10 @@ urls:
# We expect to parse 3 entries from the above
assert isinstance(result, list)
assert len(result) == 1
# There were no include entries defined
assert len(config) == 0
assert asset.app_id == "AppriseTest"
assert asset.app_desc == "Apprise Test Notifications"
assert asset.app_url == "http://nuxref.com"
@ -626,7 +754,7 @@ urls:
# For on-lookers looking through this file; here is a perfectly formatted
# YAML configuration file for your reference so you can see it without
# all of the errors like the ones identified above
result = ConfigBase.config_parse_yaml("""
result, config = ConfigBase.config_parse_yaml("""
# if no version is specified then version 1 is presumed. Thus this is a
# completely optional field. It's a good idea to just add this line because it
# will help with future ambiguity (if it ever occurs).
@ -713,66 +841,5 @@ urls:
assert 'customer' in result[5].tags
assert 'chris' in result[5].tags
def test_config_matrix_dynamic_importing(tmpdir):
"""
API: Apprise() Config Matrix Importing
"""
# Make our new path valid
suite = tmpdir.mkdir("apprise_config_test_suite")
suite.join("__init__.py").write('')
module_name = 'badconfig'
# Update our path to point to our new test suite
sys.path.insert(0, str(suite))
# Create a base area to work within
base = suite.mkdir(module_name)
base.join("__init__.py").write('')
# Test no app_id
base.join('ConfigBadFile1.py').write(
"""
class ConfigBadFile1(object):
pass""")
# No class of the same name
base.join('ConfigBadFile2.py').write(
"""
class BadClassName(object):
pass""")
# Exception thrown
base.join('ConfigBadFile3.py').write("""raise ImportError()""")
# Utilizes a schema:// already occupied (as string)
base.join('ConfigGoober.py').write(
"""
from apprise import ConfigBase
class ConfigGoober(ConfigBase):
# This class tests the fact we have a new class name, but we're
# trying to over-ride items previously used
# The default simple (insecure) protocol
protocol = 'http'
# The default secure protocol
secure_protocol = 'https'""")
# Utilizes a schema:// already occupied (as tuple)
base.join('ConfigBugger.py').write("""
from apprise import ConfigBase
class ConfigBugger(ConfigBase):
# This class tests the fact we have a new class name, but we're
# trying to over-ride items previously used
# The default simple (insecure) protocol
protocol = ('http', 'bugger-test' )
# The default secure protocol
secure_protocol = ('https', 'bugger-tests')""")
__load_matrix(path=str(base), name=module_name)
# There were no include entries defined
assert len(config) == 0