Group/Alias Configuration Support (#967)

pull/971/head
Chris Caron 2023-10-07 17:01:06 -04:00 committed by GitHub
parent c34a44fe5f
commit fdc85f502d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 507 additions and 32 deletions

View File

@ -356,6 +356,77 @@ class ConfigBase(URLBase):
# missing and/or expired.
return True
@staticmethod
def __normalize_tag_groups(group_tags):
"""
Used to normalize a tag assign map which looks like:
{
'group': set('{tag1}', '{group1}', '{tag2}'),
'group1': set('{tag2}','{tag3}'),
}
Then normalized it (merging groups); with respect to the above, the
output would be:
{
'group': set('{tag1}', '{tag2}', '{tag3}),
'group1': set('{tag2}','{tag3}'),
}
"""
# Prepare a key set list we can use
tag_groups = set([str(x) for x in group_tags.keys()])
def _expand(tags, ignore=None):
"""
Expands based on tag provided and returns a set
this also updates the group_tags while it goes
"""
# Prepare ourselves a return set
results = set()
ignore = set() if ignore is None else ignore
# track groups
groups = set()
for tag in tags:
if tag in ignore:
continue
# Track our groups
groups.add(tag)
# Store what we know is worth keping
results |= group_tags[tag] - tag_groups
# Get simple tag assignments
found = group_tags[tag] & tag_groups
if not found:
continue
for gtag in found:
if gtag in ignore:
continue
# Go deeper (recursion)
ignore.add(tag)
group_tags[gtag] = _expand(set([gtag]), ignore=ignore)
results |= group_tags[gtag]
# Pop ignore
ignore.remove(tag)
return results
for tag in tag_groups:
# Get our tags
group_tags[tag] |= _expand(set([tag]))
if not group_tags[tag]:
ConfigBase.logger.warning(
'The group {} has no tags assigned to it'.format(tag))
del group_tags[tag]
@staticmethod
def parse_url(url, verify_host=True):
"""Parses the URL and returns it broken apart into a dictionary.
@ -541,6 +612,9 @@ class ConfigBase(URLBase):
# as additional configuration entries when loaded.
include <ConfigURL>
# Assign tag contents to a group identifier
<Group(s)>=<Tag(s)>
"""
# A list of loaded Notification Services
servers = list()
@ -549,6 +623,12 @@ class ConfigBase(URLBase):
# the include keyword
configs = list()
# Track all of the tags we want to assign later on
group_tags = {}
# Track our entries to preload
preloaded = []
# Prepare our Asset Object
asset = asset if isinstance(asset, AppriseAsset) else AppriseAsset()
@ -556,7 +636,7 @@ class ConfigBase(URLBase):
valid_line_re = re.compile(
r'^\s*(?P<line>([;#]+(?P<comment>.*))|'
r'(\s*(?P<tags>[a-z0-9, \t_-]+)\s*=|=)?\s*'
r'(?P<url>[a-z0-9]{2,9}://.*)|'
r'((?P<url>[a-z0-9]{2,9}://.*)|(?P<assign>[a-z0-9, \t_-]+))|'
r'include\s+(?P<config>.+))?\s*$', re.I)
try:
@ -582,8 +662,13 @@ class ConfigBase(URLBase):
# otherwise.
return (list(), list())
url, config = result.group('url'), result.group('config')
if not (url or config):
# Retrieve our line
url, assign, config = \
result.group('url'), \
result.group('assign'), \
result.group('config')
if not (url or config or assign):
# Comment/empty line; do nothing
continue
@ -603,6 +688,33 @@ class ConfigBase(URLBase):
loggable_url = url if not asset.secure_logging \
else cwe312_url(url)
if assign:
groups = set(parse_list(result.group('tags'), cast=str))
if not groups:
# no tags were assigned
ConfigBase.logger.warning(
'Unparseable tag assignment - no group(s) '
'on line {}'.format(line))
continue
# Get our tags
tags = set(parse_list(assign, cast=str))
if not tags:
# no tags were assigned
ConfigBase.logger.warning(
'Unparseable tag assignment - no tag(s) to assign '
'on line {}'.format(line))
continue
# Update our tag group map
for tag_group in groups:
if tag_group not in group_tags:
group_tags[tag_group] = set()
# ensure our tag group is never included in the assignment
group_tags[tag_group] |= tags - set([tag_group])
continue
# Acquire our url tokens
results = plugins.url_to_dict(
url, secure_logging=asset.secure_logging)
@ -615,25 +727,57 @@ class ConfigBase(URLBase):
# Build a list of tags to associate with the newly added
# notifications if any were set
results['tag'] = set(parse_list(result.group('tags')))
results['tag'] = set(parse_list(result.group('tags'), cast=str))
# Set our Asset Object
results['asset'] = asset
# Store our preloaded entries
preloaded.append({
'results': results,
'line': line,
'loggable_url': loggable_url,
})
#
# Normalize Tag Groups
# - Expand Groups of Groups so that they don't exist
#
ConfigBase.__normalize_tag_groups(group_tags)
#
# URL Processing
#
for entry in preloaded:
# Point to our results entry for easier reference below
results = entry['results']
#
# Apply our tag groups if they're defined
#
for group, tags in group_tags.items():
# Detect if anything assigned to this tag also maps back to a
# group. If so we want to add the group to our list
if next((True for tag in results['tag']
if tag in tags), False):
results['tag'].add(group)
try:
# Attempt to create an instance of our plugin using the
# parsed URL information
plugin = common.NOTIFY_SCHEMA_MAP[results['schema']](**results)
plugin = common.NOTIFY_SCHEMA_MAP[
results['schema']](**results)
# Create log entry of loaded URL
ConfigBase.logger.debug(
'Loaded URL: %s', plugin.url(privacy=asset.secure_logging))
'Loaded URL: %s', plugin.url(
privacy=results['asset'].secure_logging))
except Exception as e:
# the arguments are invalid or can not be used.
ConfigBase.logger.warning(
'Could not load URL {} on line {}.'.format(
loggable_url, line))
entry['loggable_url'], entry['line']))
ConfigBase.logger.debug('Loading Exception: %s' % str(e))
continue
@ -665,6 +809,12 @@ class ConfigBase(URLBase):
# the include keyword
configs = list()
# Group Assignments
group_tags = {}
# Track our entries to preload
preloaded = []
try:
# Load our data (safely)
result = yaml.load(content, Loader=yaml.SafeLoader)
@ -746,7 +896,45 @@ class ConfigBase(URLBase):
tags = result.get('tag', None)
if tags and isinstance(tags, (list, tuple, str)):
# Store any preset tags
global_tags = set(parse_list(tags))
global_tags = set(parse_list(tags, cast=str))
#
# groups root directive
#
groups = result.get('groups', None)
if not isinstance(groups, (list, tuple)):
# Not a problem; we simply have no group entry
groups = list()
# Iterate over each group defined and store it
for no, entry in enumerate(groups):
if not isinstance(entry, dict):
ConfigBase.logger.warning(
'No assignment for group {}, entry #{}'.format(
entry, no + 1))
continue
for _groups, tags in entry.items():
for group in parse_list(_groups, cast=str):
if isinstance(tags, (list, tuple)):
_tags = set()
for e in tags:
if isinstance(e, dict):
_tags |= set(e.keys())
else:
_tags |= set(parse_list(e, cast=str))
# Final assignment
tags = _tags
else:
tags = set(parse_list(tags, cast=str))
if group not in group_tags:
group_tags[group] = tags
else:
group_tags[group] |= tags
#
# include root directive
@ -938,8 +1126,8 @@ class ConfigBase(URLBase):
# The below ensures our tags are set correctly
if 'tag' in _results:
# Tidy our list up
_results['tag'] = \
set(parse_list(_results['tag'])) | global_tags
_results['tag'] = set(
parse_list(_results['tag'], cast=str)) | global_tags
else:
# Just use the global settings
@ -965,29 +1153,59 @@ class ConfigBase(URLBase):
# Prepare our Asset Object
_results['asset'] = asset
# Now we generate our plugin
try:
# Attempt to create an instance of our plugin using the
# parsed URL information
plugin = common.\
NOTIFY_SCHEMA_MAP[_results['schema']](**_results)
# Store our preloaded entries
preloaded.append({
'results': _results,
'entry': no + 1,
'item': entry,
})
# Create log entry of loaded URL
ConfigBase.logger.debug(
'Loaded URL: {}'.format(
plugin.url(privacy=asset.secure_logging)))
#
# Normalize Tag Groups
# - Expand Groups of Groups so that they don't exist
#
ConfigBase.__normalize_tag_groups(group_tags)
except Exception as e:
# the arguments are invalid or can not be used.
ConfigBase.logger.warning(
'Could not load Apprise YAML configuration '
'entry #{}, item #{}'
.format(no + 1, entry))
ConfigBase.logger.debug('Loading Exception: %s' % str(e))
continue
#
# URL Processing
#
for entry in preloaded:
# Point to our results entry for easier reference below
results = entry['results']
# if we reach here, we successfully loaded our data
servers.append(plugin)
#
# Apply our tag groups if they're defined
#
for group, tags in group_tags.items():
# Detect if anything assigned to this tag also maps back to a
# group. If so we want to add the group to our list
if next((True for tag in results['tag']
if tag in tags), False):
results['tag'].add(group)
# Now we generate our plugin
try:
# Attempt to create an instance of our plugin using the
# parsed URL information
plugin = common.\
NOTIFY_SCHEMA_MAP[results['schema']](**results)
# Create log entry of loaded URL
ConfigBase.logger.debug(
'Loaded URL: %s', plugin.url(
privacy=results['asset'].secure_logging))
except Exception as e:
# the arguments are invalid or can not be used.
ConfigBase.logger.warning(
'Could not load Apprise YAML configuration '
'entry #{}, item #{}'
.format(entry['entry'], entry['item']))
ConfigBase.logger.debug('Loading Exception: %s' % str(e))
continue
# if we reach here, we successfully loaded our data
servers.append(plugin)
return (servers, configs)

View File

@ -1120,7 +1120,7 @@ def urlencode(query, doseq=False, safe='', encoding=None, errors=None):
errors=errors)
def parse_list(*args):
def parse_list(*args, cast=None):
"""
Take a string list and break it into a delimited
list of arguments. This funciton also supports
@ -1143,6 +1143,9 @@ def parse_list(*args):
result = []
for arg in args:
if not isinstance(arg, (str, set, list, bool, tuple)) and arg and cast:
arg = cast(arg)
if isinstance(arg, str):
result += re.split(STRING_DELIMITERS, arg)
@ -1155,7 +1158,6 @@ def parse_list(*args):
# Since Python v3 returns a filter (iterator) whereas Python v2 returned
# a list, we need to change it into a list object to remain compatible with
# both distribution types.
# TODO: Review after dropping support for Python 2.
return sorted([x for x in filter(bool, list(set(result)))])

View File

@ -33,6 +33,7 @@
import pytest
from apprise.AppriseAsset import AppriseAsset
from apprise.config.ConfigBase import ConfigBase
from apprise import Apprise
from apprise import ConfigFormat
from inspect import cleandoc
import yaml
@ -357,6 +358,149 @@ def test_config_base_config_parse_text():
assert 'tag3' in result[0].tags
def test_config_base_config_tag_groups_text():
"""
API: ConfigBase.config_tag_groups_text object
"""
# Valid Configuration
result, config = ConfigBase.config_parse_text("""
# Tag assignments
groupA, groupB = tagB, tagC
# groupB doubles down as it takes the entries initialized above
# plus the added ones defined below
groupB = tagA, tagB, tagD
groupC = groupA, groupB, groupC, tagE
# Tag that recursively looks to more tags
groupD = groupC
# Assigned ourselves
groupX = groupX
# Set up a recursive loop
groupE = groupF
groupF = groupE
# Set up a larger recursive loop
groupG = groupH
groupH = groupI
groupI = groupJ
groupJ = groupK
groupK = groupG
# Bad assignments
groupM = , , ,
, , = , , ,
# int's and floats are okay
1 = 2
a = 5
# A comment line over top of a URL
4, groupB = mailto://userb:pass@gmail.com
# Tag Assignments
tagA,groupB=json://localhost
# More Tag Assignments
tagC,groupB=xml://localhost
# More Tag Assignments
groupD=form://localhost
""", asset=AppriseAsset())
# We expect to parse 3 entries from the above
assert isinstance(result, list)
assert isinstance(config, list)
assert len(result) == 4
# Our first element is our group tags
assert len(result[0].tags) == 2
assert 'groupB' in result[0].tags
assert '4' in result[0].tags
# No additional configuration is loaded
assert len(config) == 0
apobj = Apprise()
assert apobj.add(result)
# We match against 1 entry
assert len([x for x in apobj.find('tagA')]) == 1
assert len([x for x in apobj.find('tagB')]) == 0
assert len([x for x in apobj.find('groupA')]) == 1
assert len([x for x in apobj.find('groupB')]) == 3
assert len([x for x in apobj.find('groupC')]) == 2
assert len([x for x in apobj.find('groupD')]) == 3
# Invalid Assignment
result, config = ConfigBase.config_parse_text("""
# Must have something to equal or it's a bad line
group =
# A tag Assignments that is never gotten to as the line
# above is bad
groupD=form://localhost
""")
# We expect to parse 3 entries from the above
assert isinstance(result, list)
assert isinstance(config, list)
assert len(result) == 0
assert len(config) == 0
# Invalid Assignment
result, config = ConfigBase.config_parse_text("""
# Rundant assignment
group = group
# Our group assignment
group=windows://
""")
# the redundant assignment does us no harm; but it doesn't grant us any
# value either
assert isinstance(result, list)
assert len(result) == 1
# Our first element is our group tags
assert len(result[0].tags) == 1
assert 'group' in result[0].tags
# There were no include entries defined
assert len(config) == 0
# More invalid data
result, config = ConfigBase.config_parse_text("""
# A tag without a url or group assignment
taga=
""")
# We expect to parse 0 entries from the above
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
result, config = ConfigBase.config_parse_text("""
# A tag without a url or group assignment
taga= %%INVALID
""")
# We expect to parse 0 entries from the above
assert isinstance(result, list)
assert len(result) == 0
# There were no include entries defined
assert len(config) == 0
def test_config_base_config_parse_text_with_url():
"""
API: ConfigBase.config_parse_text object_with_url
@ -1015,6 +1159,117 @@ def test_yaml_vs_text_tagging():
assert 'mytag' in yaml_result[0]
def test_config_base_config_tag_groups_yaml():
"""
API: ConfigBase.config_tag_groups_yaml object
"""
# general reference used below
asset = AppriseAsset()
# Valid Configuration
result, config = ConfigBase.config_parse_yaml("""
# if no version is specified then version 1 is presumed
version: 1
groups:
- group1: tagB, tagC, tagNotAssigned
- group2:
- tagA
- tagC
- group3:
- tagD: optional comment
- tagA: optional comment #2
# No assignment
- group4
# No assignment type 2
- group5:
# Integer assignment
- group6: 3
- group6: 3, 4, 5, test
- group6: 3.5, tagC
# Recursion
- groupA: groupB
- groupB: groupA
# And Again... (just because)
- groupA: groupB
- groupB: groupA
# Self assignment
- groupX: groupX
# Set up a larger recursive loop
- groupG: groupH
- groupH: groupI, groupJ
- groupI: groupJ, groupG
- groupJ: groupK, groupH, groupI
- groupK: groupG
# No tags assigned
- groupK: ",, , ,"
- " , ": ",, , ,"
# Multi Assignments
- groupL, groupM: tagD, tagA
- 4, groupN:
- tagD
- tagE, TagA
# Add one more tag to groupL making it different then GroupM by 1
- groupL: tagB
#
# Define your notification urls:
#
urls:
- form://localhost:
- tag: tagA
- mailto://test:password@gmail.com:
- tag: tagB
- xml://localhost:
- tag: tagC
- json://localhost:
- tag: tagD, tagA
""", asset=asset)
# We expect to parse 3 entries from the above
assert isinstance(result, list)
assert isinstance(config, list)
assert len(result) == 4
# Our first element is our group tags
assert len(result[0].tags) == 5
assert 'group2' in result[0].tags
assert 'group3' in result[0].tags
assert 'groupL' in result[0].tags
assert 'groupM' in result[0].tags
assert 'tagA' in result[0].tags
# No additional configuration is loaded
assert len(config) == 0
apobj = Apprise()
assert apobj.add(result)
# We match against 1 entry
assert len([x for x in apobj.find('tagA')]) == 2
assert len([x for x in apobj.find('tagB')]) == 1
assert len([x for x in apobj.find('tagC')]) == 1
assert len([x for x in apobj.find('tagD')]) == 1
assert len([x for x in apobj.find('group1')]) == 2
assert len([x for x in apobj.find('group2')]) == 3
assert len([x for x in apobj.find('group3')]) == 2
assert len([x for x in apobj.find('group4')]) == 0
assert len([x for x in apobj.find('group5')]) == 0
assert len([x for x in apobj.find('group6')]) == 2
assert len([x for x in apobj.find('4')]) == 1
assert len([x for x in apobj.find('groupN')]) == 1
def test_config_base_config_parse_yaml_globals():
"""
API: ConfigBase.config_parse_yaml globals