MSTeams/Apprise custom template support (#299)

pull/307/head
Chris Caron 2020-10-02 20:50:27 -04:00 committed by GitHub
parent 525f386959
commit 8ca8e51833
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 974 additions and 68 deletions

View File

@ -805,16 +805,16 @@ class ConfigBase(URLBase):
.format(key, no + 1))
continue
# Store our URL and Schema Regex
_url = key
# Store our schema
schema = _schema.group('schema').lower()
# Store our URL and Schema Regex
_url = key
if _url is None:
# the loop above failed to match anything
ConfigBase.logger.warning(
'Unsupported schema in urls, entry #{}'.format(no + 1))
'Unsupported URL, entry #{}'.format(no + 1))
continue
_results = plugins.url_to_dict(_url)
@ -844,6 +844,11 @@ class ConfigBase(URLBase):
if 'schema' in entries:
del entries['schema']
# support our special tokens (if they're present)
if schema in plugins.SCHEMA_MAP:
entries = ConfigBase.__extract_special_tokens(
schema, entries)
# Extend our dictionary with our new entries
r.update(entries)
@ -851,6 +856,11 @@ class ConfigBase(URLBase):
results.append(r)
elif isinstance(tokens, dict):
# support our special tokens (if they're present)
if schema in plugins.SCHEMA_MAP:
tokens = ConfigBase.__extract_special_tokens(
schema, tokens)
# Copy ourselves a template of our parsed URL as a base to
# work with
r = _results.copy()
@ -949,6 +959,53 @@ class ConfigBase(URLBase):
# Pop the element off of the stack
return self._cached_servers.pop(index)
@staticmethod
def __extract_special_tokens(schema, tokens):
"""
This function takes a list of tokens and updates them to no longer
include any special tokens such as +,-, and :
- schema must be a valid schema of a supported plugin type
- tokens must be a dictionary containing the yaml entries parsed.
The idea here is we can post process a set of tokens provided in
a YAML file where the user provided some of the special keywords.
We effectivley look up what these keywords map to their appropriate
value they're expected
"""
# Create a copy of our dictionary
tokens = tokens.copy()
for kw, meta in plugins.SCHEMA_MAP[schema]\
.template_kwargs.items():
# Determine our prefix:
prefix = meta.get('prefix', '+')
# Detect any matches
matches = \
{k[1:]: str(v) for k, v in tokens.items()
if k.startswith(prefix)}
if not matches:
# we're done with this entry
continue
if not isinstance(tokens.get(kw, None), dict):
# Invalid; correct it
tokens[kw] = dict()
# strip out processed tokens
tokens = {k: v for k, v in tokens.items()
if not k.startswith(prefix)}
# Update our entries
tokens[kw].update(matches)
# Return our tokens
return tokens
def __getitem__(self, index):
"""
Returns the indexed server entry associated with the loaded

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
@ -62,7 +62,7 @@
#
import re
import requests
from json import dumps
import json
from .NotifyBase import NotifyBase
from ..common import NotifyImageSize
@ -70,8 +70,18 @@ from ..common import NotifyType
from ..common import NotifyFormat
from ..utils import parse_bool
from ..utils import validate_regex
from ..utils import apply_template
from ..utils import TemplateType
from ..AppriseAttachment import AppriseAttachment
from ..AppriseLocale import gettext_lazy as _
try:
from json.decoder import JSONDecodeError
except ImportError:
# Python v2.7 Backwards Compatibility support
JSONDecodeError = ValueError
# Used to prepare our UUID regex matching
UUID4_RE = \
r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}'
@ -106,6 +116,10 @@ class NotifyMSTeams(NotifyBase):
# Default Notification Format
notify_format = NotifyFormat.MARKDOWN
# There is no reason we should exceed 35KB when reading in a JSON file.
# If it is more than this, then it is not accepted
max_msteams_template_size = 35000
# Define object templates
templates = (
'{schema}://{token_a}/{token_b}{token_c}',
@ -150,12 +164,30 @@ class NotifyMSTeams(NotifyBase):
'default': False,
'map_to': 'include_image',
},
'template': {
'name': _('Template Path'),
'type': 'string',
'private': True,
},
})
# Define our token control
template_kwargs = {
'tokens': {
'name': _('Template Tokens'),
'prefix': ':',
},
}
def __init__(self, token_a, token_b, token_c, include_image=True,
**kwargs):
template=None, tokens=None, **kwargs):
"""
Initialize Microsoft Teams Object
You can optional specify a template and identify arguments you
wish to populate your template with when posting. Some reserved
template arguments that can not be over-ridden are:
`body`, `title`, and `type`.
"""
super(NotifyMSTeams, self).__init__(**kwargs)
@ -186,8 +218,120 @@ class NotifyMSTeams(NotifyBase):
# Place a thumbnail image inline with the message body
self.include_image = include_image
# Our template object is just an AppriseAttachment object
self.template = AppriseAttachment(asset=self.asset)
if template:
# Add our definition to our template
self.template.add(template)
# Enforce maximum file size
self.template[0].max_file_size = self.max_msteams_template_size
# Template functionality
self.tokens = {}
if isinstance(tokens, dict):
self.tokens.update(tokens)
elif tokens:
msg = 'The specified MSTeams Template Tokens ' \
'({}) are not identified as a dictionary.'.format(tokens)
self.logger.warning(msg)
raise TypeError(msg)
# else: NoneType - this is okay
return
def gen_payload(self, body, title='', notify_type=NotifyType.INFO,
**kwargs):
"""
This function generates our payload whether it be the generic one
Apprise generates by default, or one provided by a specified
external template.
"""
# Acquire our to-be footer icon if configured to do so
image_url = None if not self.include_image \
else self.image_url(notify_type)
if not self.template:
# By default we use a generic working payload if there was
# no template specified
payload = {
"@type": "MessageCard",
"@context": "https://schema.org/extensions",
"summary": self.app_desc,
"themeColor": self.color(notify_type),
"sections": [
{
"activityImage": None,
"activityTitle": title,
"text": body,
},
]
}
if image_url:
payload['sections'][0]['activityImage'] = image_url
return payload
# If our code reaches here, then we generate ourselves the payload
template = self.template[0]
if not template:
# We could not access the attachment
self.logger.error(
'Could not access MSTeam template {}.'.format(
template.url(privacy=True)))
return False
# Take a copy of our token dictionary
tokens = self.tokens.copy()
# Apply some defaults template values
tokens['app_body'] = body
tokens['app_title'] = title
tokens['app_type'] = notify_type
tokens['app_id'] = self.app_id
tokens['app_desc'] = self.app_desc
tokens['app_color'] = self.color(notify_type)
tokens['app_image_url'] = image_url
tokens['app_url'] = self.app_url
# Enforce Application mode
tokens['app_mode'] = TemplateType.JSON
try:
with open(template.path, 'r') as fp:
content = json.loads(apply_template(fp.read(), **tokens))
except (OSError, IOError):
self.logger.error(
'MSTeam template {} could not be read.'.format(
template.url(privacy=True)))
return None
except JSONDecodeError as e:
self.logger.error(
'MSTeam template {} contains invalid JSON.'.format(
template.url(privacy=True)))
self.logger.debug('JSONDecodeError: {}'.format(e))
return None
# Load our JSON data (if valid)
has_error = False
if '@type' not in content:
self.logger.error(
'MSTeam template {} is missing @type kwarg.'.format(
template.url(privacy=True)))
has_error = True
if '@context' not in content:
self.logger.error(
'MSTeam template {} is missing @context kwarg.'.format(
template.url(privacy=True)))
has_error = True
return content if not has_error else None
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Microsoft Teams Notification
@ -205,27 +349,13 @@ class NotifyMSTeams(NotifyBase):
self.token_c,
)
# Prepare our payload
payload = {
"@type": "MessageCard",
"@context": "https://schema.org/extensions",
"summary": self.app_desc,
"themeColor": self.color(notify_type),
"sections": [
{
"activityImage": None,
"activityTitle": title,
"text": body,
},
]
}
# Acquire our to-be footer icon if configured to do so
image_url = None if not self.include_image \
else self.image_url(notify_type)
if image_url:
payload['sections'][0]['activityImage'] = image_url
# Generate our payload if it's possible
payload = self.gen_payload(
body=body, title=title, notify_type=notify_type, **kwargs)
if not payload:
# No need to present a reason; that will come from the
# gen_payload() function itself
return False
self.logger.debug('MSTeams POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
@ -237,7 +367,7 @@ class NotifyMSTeams(NotifyBase):
try:
r = requests.post(
url,
data=dumps(payload),
data=json.dumps(payload),
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
@ -283,8 +413,14 @@ class NotifyMSTeams(NotifyBase):
'image': 'yes' if self.include_image else 'no',
}
if self.template:
params['template'] = NotifyMSTeams.quote(
self.template[0].url(), safe='')
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
# Store any template entries if specified
params.update({':{}'.format(k): v for k, v in self.tokens.items()})
return '{schema}://{token_a}/{token_b}/{token_c}/'\
'?{params}'.format(
@ -341,6 +477,13 @@ class NotifyMSTeams(NotifyBase):
results['include_image'] = \
parse_bool(results['qsd'].get('image', True))
if 'template' in results['qsd'] and results['qsd']['template']:
results['template'] = \
NotifyMSTeams.unquote(results['qsd']['template'])
# Store our tokens
results['tokens'] = results['qsd:']
return results
@staticmethod

View File

@ -25,6 +25,7 @@
import re
import six
import json
import contextlib
import os
from os.path import expanduser
@ -95,9 +96,10 @@ TIDY_NUX_TRIM_RE = re.compile(
# The handling of custom arguments passed in the URL; we treat any
# argument (which would otherwise appear in the qsd area of our parse_url()
# function differently if they start with a + or - value
# function differently if they start with a +, - or : value
NOTIFY_CUSTOM_ADD_TOKENS = re.compile(r'^( |\+)(?P<key>.*)\s*')
NOTIFY_CUSTOM_DEL_TOKENS = re.compile(r'^-(?P<key>.*)\s*')
NOTIFY_CUSTOM_COLON_TOKENS = re.compile(r'^:(?P<key>.*)\s*')
# Used for attempting to acquire the schema if the URL can't be parsed.
GET_SCHEMA_RE = re.compile(r'\s*(?P<schema>[a-z0-9]{2,9})://.*$', re.I)
@ -141,6 +143,19 @@ EMAIL_DETECTION_RE = re.compile(
REGEX_VALIDATE_LOOKUP = {}
class TemplateType(object):
"""
Defines the different template types we can perform parsing on
"""
# RAW does nothing at all to the content being parsed
# data is taken at it's absolute value
RAW = 'raw'
# Data is presumed to be of type JSON and is therefore escaped
# if required to do so (such as single quotes)
JSON = 'json'
def is_ipaddr(addr, ipv4=True, ipv6=True):
"""
Validates against IPV4 and IPV6 IP Addresses
@ -318,10 +333,11 @@ def parse_qsd(qs):
'qsd': {},
# Detected Entries that start with + or - are additionally stored in
# these values (un-touched). The +/- however are stripped from their
# these values (un-touched). The :,+,- however are stripped from their
# name before they are stored here.
'qsd+': {},
'qsd-': {},
'qsd:': {},
}
pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
@ -361,6 +377,12 @@ def parse_qsd(qs):
# Store content 'as-is'
result['qsd-'][k.group('key')] = val
# Check for tokens that start with a colon symbol (:)
k = NOTIFY_CUSTOM_COLON_TOKENS.match(key)
if k is not None:
# Store content 'as-is'
result['qsd:'][k.group('key')] = val
return result
@ -418,11 +440,12 @@ def parse_url(url, default_schema='http', verify_host=True):
# qsd = Query String Dictionary
'qsd': {},
# Detected Entries that start with + or - are additionally stored in
# these values (un-touched). The +/- however are stripped from their
# name before they are stored here.
# Detected Entries that start with +, - or : are additionally stored in
# these values (un-touched). The +, -, and : however are stripped
# from their name before they are stored here.
'qsd+': {},
'qsd-': {},
'qsd:': {},
}
qsdata = ''
@ -845,3 +868,45 @@ def environ(*remove, **update):
finally:
# Restore our snapshot
os.environ = env_orig.copy()
def apply_template(template, app_mode=TemplateType.RAW, **kwargs):
"""
Takes a template in a str format and applies all of the keywords
and their values to it.
The app$mode is used to dictact any pre-processing that needs to take place
to the escaped string prior to it being placed. The idea here is for
elements to be placed in a JSON response for example should be escaped
early in their string format.
The template must contain keywords wrapped in in double
squirly braces like {{keyword}}. These are matched to the respected
kwargs passed into this function.
If there is no match found, content is not swapped.
"""
def _escape_raw(content):
# No escaping necessary
return content
def _escape_json(content):
# remove surounding quotes
return json.dumps(content)[1:-1]
# Our escape function
fn = _escape_json if app_mode == TemplateType.JSON else _escape_raw
lookup = [re.escape(x) for x in kwargs.keys()]
# Compile this into a list
mask_r = re.compile(
re.escape('{{') + r'\s*(' + '|'.join(lookup) + r')\s*'
+ re.escape('}}'), re.IGNORECASE)
# we index 2 characters off the head and 2 characters from the tail
# to drop the '{{' and '}}' surrounding our match so that we can
# re-index it back into our list
return mask_r.sub(lambda x: fn(kwargs[x.group()[2:-2].strip()]), template)

View File

@ -776,18 +776,17 @@ def test_apprise_asset(tmpdir):
)
a.default_html_color = '#abcabc'
a.html_notify_map[NotifyType.INFO] = '#aaaaaa'
assert a.color('invalid', tuple) == (171, 202, 188)
assert a.color(NotifyType.INFO, tuple) == (170, 170, 170)
assert a.color(NotifyType.INFO, tuple) == (58, 163, 227)
assert a.color('invalid', int) == 11258556
assert a.color(NotifyType.INFO, int) == 11184810
assert a.color(NotifyType.INFO, int) == 3843043
assert a.color('invalid', None) == '#abcabc'
assert a.color(NotifyType.INFO, None) == '#aaaaaa'
assert a.color(NotifyType.INFO, None) == '#3AA3E3'
# None is the default
assert a.color(NotifyType.INFO) == '#aaaaaa'
assert a.color(NotifyType.INFO) == '#3AA3E3'
# Invalid Type
with pytest.raises(ValueError):
@ -1203,7 +1202,7 @@ def test_apprise_details_plugin_verification():
assert isinstance(arg['prefix'], six.string_types)
if section == 'kwargs':
# The only acceptable prefix types for kwargs
assert arg['prefix'] in ('+', '-')
assert arg['prefix'] in (':', '+', '-')
else:
# kwargs requires that the 'prefix' is defined

View File

@ -605,7 +605,7 @@ urls:
""", asset=asset)
# We expect to parse 4 entries from the above because the tgram:// entry
# We expect to parse 6 entries from the above because the tgram:// entry
# would have failed to be loaded
assert isinstance(result, list)
assert len(result) == 6
@ -884,6 +884,20 @@ include:
assert 'http://localhost/apprise/cfg02' in config
assert 'http://localhost/apprise/cfg03' in config
# Test a configuration with an invalid schema with options
result, config = ConfigBase.config_parse_yaml("""
urls:
- invalid://:
tag: 'invalid'
:name: 'Testing2'
:body: 'test body2'
:title: 'test title2'
""", asset=asset)
# We will have loaded no results
assert isinstance(result, list)
assert len(result) == 0
# Valid Configuration (we allow comma separated entries for
# each defined bullet)
result, config = ConfigBase.config_parse_yaml("""

548
test/test_msteam_plugin.py Normal file
View File

@ -0,0 +1,548 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import mock
import json
import requests
import pytest
from apprise import Apprise
from apprise import AppriseConfig
from apprise import plugins
from apprise import NotifyType
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
@mock.patch('requests.post')
def test_msteams_templating(mock_post, tmpdir):
"""
API: NotifyMSTeams() Templating
"""
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Prepare Mock
mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok
uuid4 = '8b799edf-6f98-4d3a-9be7-2862fb4e5752'
url = 'msteams://{}@{}/{}/{}'.format(uuid4, uuid4, 'a' * 32, uuid4)
# Test cases where our URL is invalid
template = tmpdir.join("simple.json")
template.write("""
{
"@type": "MessageCard",
"@context": "https://schema.org/extensions",
"summary": "{{app_id}}",
"themeColor": "{{app_color}}",
"sections": [
{
"activityImage": null,
"activityTitle": "{{app_title}}",
"text": "{{app_body}}"
}
]
}
""")
# Instantiate our URL
obj = Apprise.instantiate('{url}/?template={template}&{kwargs}'.format(
url=url,
template=str(template),
kwargs=':key1=token&:key2=token',
))
assert isinstance(obj, plugins.NotifyMSTeams)
assert obj.notify(
body="body", title='title',
notify_type=NotifyType.INFO) is True
assert mock_post.called is True
assert mock_post.call_args_list[0][0][0].startswith(
'https://outlook.office.com/webhook/')
# Our Posted JSON Object
posted_json = json.loads(mock_post.call_args_list[0][1]['data'])
assert 'summary' in posted_json
assert posted_json['summary'] == 'Apprise'
assert posted_json['themeColor'] == '#3AA3E3'
assert posted_json['sections'][0]['activityTitle'] == 'title'
assert posted_json['sections'][0]['text'] == 'body'
# Test invalid JSON
# Test cases where our URL is invalid
template = tmpdir.join("invalid.json")
template.write("}")
# Instantiate our URL
obj = Apprise.instantiate('{url}/?template={template}&{kwargs}'.format(
url=url,
template=str(template),
kwargs=':key1=token&:key2=token',
))
assert isinstance(obj, plugins.NotifyMSTeams)
# We will fail to preform our notifcation because the JSON is bad
assert obj.notify(
body="body", title='title',
notify_type=NotifyType.INFO) is False
# Test cases where we're missing the @type part of the URL
template = tmpdir.join("missing_type.json")
template.write("""
{
"@context": "https://schema.org/extensions",
"summary": "{{app_id}}",
"themeColor": "{{app_color}}",
"sections": [
{
"activityImage": null,
"activityTitle": "{{app_title}}",
"text": "{{app_body}}"
}
]
}
""")
# Instantiate our URL
obj = Apprise.instantiate('{url}/?template={template}&{kwargs}'.format(
url=url,
template=str(template),
kwargs=':key1=token&:key2=token',
))
assert isinstance(obj, plugins.NotifyMSTeams)
# We can not load the file because we're missing the @type entry
assert obj.notify(
body="body", title='title',
notify_type=NotifyType.INFO) is False
# Test cases where we're missing the @context part of the URL
template = tmpdir.join("missing_context.json")
template.write("""
{
"@type": "MessageCard",
"summary": "{{app_id}}",
"themeColor": "{{app_color}}",
"sections": [
{
"activityImage": null,
"activityTitle": "{{app_title}}",
"text": "{{app_body}}"
}
]
}
""")
# Instantiate our URL
obj = Apprise.instantiate('{url}/?template={template}&{kwargs}'.format(
url=url,
template=str(template),
kwargs=':key1=token&:key2=token',
))
assert isinstance(obj, plugins.NotifyMSTeams)
# We can not load the file because we're missing the @context entry
assert obj.notify(
body="body", title='title',
notify_type=NotifyType.INFO) is False
# Test a case where we can not access the file:
with mock.patch('json.loads', side_effect=OSError):
# we fail, but this time it's because we couldn't
# access the cached file contents for reading
assert obj.notify(
body="body", title='title',
notify_type=NotifyType.INFO) is False
# A more complicated example; uses a target
mock_post.reset_mock()
template = tmpdir.join("more_complicated_example.json")
template.write("""
{
"@type": "MessageCard",
"@context": "https://schema.org/extensions",
"summary": "{{app_desc}}",
"themeColor": "{{app_color}}",
"sections": [
{
"activityImage": null,
"activityTitle": "{{app_title}}",
"text": "{{app_body}}"
}
],
"potentialAction": [{
"@type": "ActionCard",
"name": "Add a comment",
"inputs": [{
"@type": "TextInput",
"id": "comment",
"isMultiline": false,
"title": "Add a comment here for this task."
}],
"actions": [{
"@type": "HttpPOST",
"name": "Add Comment",
"target": "{{ target }}"
}]
}]
}
""")
# Instantiate our URL
obj = Apprise.instantiate('{url}/?template={template}&{kwargs}'.format(
url=url,
template=str(template),
kwargs=':key1=token&:key2=token&:target=http://localhost',
))
assert isinstance(obj, plugins.NotifyMSTeams)
assert obj.notify(
body="body", title='title',
notify_type=NotifyType.INFO) is True
assert mock_post.called is True
assert mock_post.call_args_list[0][0][0].startswith(
'https://outlook.office.com/webhook/')
# Our Posted JSON Object
posted_json = json.loads(mock_post.call_args_list[0][1]['data'])
assert 'summary' in posted_json
assert posted_json['summary'] == 'Apprise Notifications'
assert posted_json['themeColor'] == '#3AA3E3'
assert posted_json['sections'][0]['activityTitle'] == 'title'
assert posted_json['sections'][0]['text'] == 'body'
# We even parsed our entry out of the URL
assert posted_json['potentialAction'][0]['actions'][0]['target'] \
== 'http://localhost'
@mock.patch('requests.post')
def test_msteams_yaml_config(mock_post, tmpdir):
"""
API: NotifyMSTeams() YAML Configuration Entries
"""
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Prepare Mock
mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok
uuid4 = '8b799edf-6f98-4d3a-9be7-2862fb4e5752'
url = 'msteams://{}@{}/{}/{}'.format(uuid4, uuid4, 'a' * 32, uuid4)
# Test cases where our URL is invalid
template = tmpdir.join("simple.json")
template.write("""
{
"@type": "MessageCard",
"@context": "https://schema.org/extensions",
"summary": "{{name}}",
"themeColor": "{{app_color}}",
"sections": [
{
"activityImage": null,
"activityTitle": "{{title}}",
"text": "{{body}}"
}
]
}
""")
# Test Invalid Filename
config = tmpdir.join("msteams01.yml")
config.write("""
urls:
- {url}:
- tag: 'msteams'
template: {template}.missing
:name: 'Template.Missing'
:body: 'test body'
:title: 'test title'
""".format(url=url, template=str(template)))
cfg = AppriseConfig()
cfg.add(str(config))
assert len(cfg) == 1
assert len(cfg[0]) == 1
obj = cfg[0][0]
assert isinstance(obj, plugins.NotifyMSTeams)
assert obj.notify(
body="body", title='title',
notify_type=NotifyType.INFO) is False
assert mock_post.called is False
# Test token identifiers
config = tmpdir.join("msteams01.yml")
config.write("""
urls:
- {url}:
- tag: 'msteams'
template: {template}
:name: 'Testing'
:body: 'test body'
:title: 'test title'
""".format(url=url, template=str(template)))
cfg = AppriseConfig()
cfg.add(str(config))
assert len(cfg) == 1
assert len(cfg[0]) == 1
obj = cfg[0][0]
assert isinstance(obj, plugins.NotifyMSTeams)
assert obj.notify(
body="body", title='title',
notify_type=NotifyType.INFO) is True
assert mock_post.called is True
assert mock_post.call_args_list[0][0][0].startswith(
'https://outlook.office.com/webhook/')
# Our Posted JSON Object
posted_json = json.loads(mock_post.call_args_list[0][1]['data'])
assert 'summary' in posted_json
assert posted_json['summary'] == 'Testing'
assert posted_json['themeColor'] == '#3AA3E3'
assert posted_json['sections'][0]['activityTitle'] == 'test title'
assert posted_json['sections'][0]['text'] == 'test body'
#
# Now again but without a bullet under the url definition
#
mock_post.reset_mock()
config = tmpdir.join("msteams02.yml")
config.write("""
urls:
- {url}:
tag: 'msteams'
template: {template}
:name: 'Testing2'
:body: 'test body2'
:title: 'test title2'
""".format(url=url, template=str(template)))
cfg = AppriseConfig()
cfg.add(str(config))
assert len(cfg) == 1
assert len(cfg[0]) == 1
obj = cfg[0][0]
assert isinstance(obj, plugins.NotifyMSTeams)
assert obj.notify(
body="body", title='title',
notify_type=NotifyType.INFO) is True
assert mock_post.called is True
assert mock_post.call_args_list[0][0][0].startswith(
'https://outlook.office.com/webhook/')
# Our Posted JSON Object
posted_json = json.loads(mock_post.call_args_list[0][1]['data'])
assert 'summary' in posted_json
assert posted_json['summary'] == 'Testing2'
assert posted_json['themeColor'] == '#3AA3E3'
assert posted_json['sections'][0]['activityTitle'] == 'test title2'
assert posted_json['sections'][0]['text'] == 'test body2'
#
# Try again but store the content as a dictionary in the cofiguration file
#
mock_post.reset_mock()
config = tmpdir.join("msteams03.yml")
config.write("""
urls:
- {url}:
- tag: 'msteams'
template: {template}
tokens:
name: 'Testing3'
body: 'test body3'
title: 'test title3'
""".format(url=url, template=str(template)))
cfg = AppriseConfig()
cfg.add(str(config))
assert len(cfg) == 1
assert len(cfg[0]) == 1
obj = cfg[0][0]
assert isinstance(obj, plugins.NotifyMSTeams)
assert obj.notify(
body="body", title='title',
notify_type=NotifyType.INFO) is True
assert mock_post.called is True
assert mock_post.call_args_list[0][0][0].startswith(
'https://outlook.office.com/webhook/')
# Our Posted JSON Object
posted_json = json.loads(mock_post.call_args_list[0][1]['data'])
assert 'summary' in posted_json
assert posted_json['summary'] == 'Testing3'
assert posted_json['themeColor'] == '#3AA3E3'
assert posted_json['sections'][0]['activityTitle'] == 'test title3'
assert posted_json['sections'][0]['text'] == 'test body3'
#
# Now again but without a bullet under the url definition
#
mock_post.reset_mock()
config = tmpdir.join("msteams04.yml")
config.write("""
urls:
- {url}:
tag: 'msteams'
template: {template}
tokens:
name: 'Testing4'
body: 'test body4'
title: 'test title4'
""".format(url=url, template=str(template)))
cfg = AppriseConfig()
cfg.add(str(config))
assert len(cfg) == 1
assert len(cfg[0]) == 1
obj = cfg[0][0]
assert isinstance(obj, plugins.NotifyMSTeams)
assert obj.notify(
body="body", title='title',
notify_type=NotifyType.INFO) is True
assert mock_post.called is True
assert mock_post.call_args_list[0][0][0].startswith(
'https://outlook.office.com/webhook/')
# Our Posted JSON Object
posted_json = json.loads(mock_post.call_args_list[0][1]['data'])
assert 'summary' in posted_json
assert posted_json['summary'] == 'Testing4'
assert posted_json['themeColor'] == '#3AA3E3'
assert posted_json['sections'][0]['activityTitle'] == 'test title4'
assert posted_json['sections'][0]['text'] == 'test body4'
# Now let's do a combination of the two
mock_post.reset_mock()
config = tmpdir.join("msteams05.yml")
config.write("""
urls:
- {url}:
- tag: 'msteams'
template: {template}
tokens:
body: 'test body5'
title: 'test title5'
:name: 'Testing5'
""".format(url=url, template=str(template)))
cfg = AppriseConfig()
cfg.add(str(config))
assert len(cfg) == 1
assert len(cfg[0]) == 1
obj = cfg[0][0]
assert isinstance(obj, plugins.NotifyMSTeams)
assert obj.notify(
body="body", title='title',
notify_type=NotifyType.INFO) is True
assert mock_post.called is True
assert mock_post.call_args_list[0][0][0].startswith(
'https://outlook.office.com/webhook/')
# Our Posted JSON Object
posted_json = json.loads(mock_post.call_args_list[0][1]['data'])
assert 'summary' in posted_json
assert posted_json['summary'] == 'Testing5'
assert posted_json['themeColor'] == '#3AA3E3'
assert posted_json['sections'][0]['activityTitle'] == 'test title5'
assert posted_json['sections'][0]['text'] == 'test body5'
# Now let's do a test where our tokens is not the expected
# dictionary we want to see
mock_post.reset_mock()
config = tmpdir.join("msteams06.yml")
config.write("""
urls:
- {url}:
- tag: 'msteams'
template: {template}
# Not a dictionary
tokens:
body
""".format(url=url, template=str(template)))
cfg = AppriseConfig()
cfg.add(str(config))
assert len(cfg) == 1
# It could not load because of invalid tokens
assert len(cfg[0]) == 0
def test_notify_msteams_plugin():
"""
API: NotifyMSTeams() Extra Checks
"""
# Initializes the plugin with an invalid token
with pytest.raises(TypeError):
plugins.NotifyMSTeams(token_a=None, token_b='abcd', token_c='abcd')
# Whitespace also acts as an invalid token value
with pytest.raises(TypeError):
plugins.NotifyMSTeams(token_a=' ', token_b='abcd', token_c='abcd')
with pytest.raises(TypeError):
plugins.NotifyMSTeams(token_a='abcd', token_b=None, token_c='abcd')
# Whitespace also acts as an invalid token value
with pytest.raises(TypeError):
plugins.NotifyMSTeams(token_a='abcd', token_b=' ', token_c='abcd')
with pytest.raises(TypeError):
plugins.NotifyMSTeams(token_a='abcd', token_b='abcd', token_c=None)
# Whitespace also acts as an invalid token value
with pytest.raises(TypeError):
plugins.NotifyMSTeams(token_a='abcd', token_b='abcd', token_c=' ')
uuid4 = '8b799edf-6f98-4d3a-9be7-2862fb4e5752'
token_a = '{}@{}'.format(uuid4, uuid4)
token_b = 'A' * 32
# test case where no tokens are specified
obj = plugins.NotifyMSTeams(
token_a=token_a, token_b=token_b, token_c=uuid4)
assert isinstance(obj, plugins.NotifyMSTeams)

View File

@ -5256,31 +5256,6 @@ def test_notify_msg91_plugin(mock_post):
plugins.NotifyMSG91(authkey=" ", targets=target)
def test_notify_msteams_plugin():
"""
API: NotifyMSTeams() Extra Checks
"""
# Initializes the plugin with an invalid token
with pytest.raises(TypeError):
plugins.NotifyMSTeams(token_a=None, token_b='abcd', token_c='abcd')
# Whitespace also acts as an invalid token value
with pytest.raises(TypeError):
plugins.NotifyMSTeams(token_a=' ', token_b='abcd', token_c='abcd')
with pytest.raises(TypeError):
plugins.NotifyMSTeams(token_a='abcd', token_b=None, token_c='abcd')
# Whitespace also acts as an invalid token value
with pytest.raises(TypeError):
plugins.NotifyMSTeams(token_a='abcd', token_b=' ', token_c='abcd')
with pytest.raises(TypeError):
plugins.NotifyMSTeams(token_a='abcd', token_b='abcd', token_c=None)
# Whitespace also acts as an invalid token value
with pytest.raises(TypeError):
plugins.NotifyMSTeams(token_a='abcd', token_b='abcd', token_c=' ')
def test_notify_prowl_plugin():
"""
API: NotifyProwl() Extra Checks

View File

@ -26,6 +26,7 @@
from __future__ import print_function
import re
import os
import six
try:
# Python 2.7
from urllib import unquote
@ -46,10 +47,11 @@ def test_parse_qsd():
result = utils.parse_qsd('a=1&b=&c&d=abcd')
assert isinstance(result, dict) is True
assert len(result) == 3
assert len(result) == 4
assert 'qsd' in result
assert 'qsd+' in result
assert 'qsd-' in result
assert 'qsd:' in result
assert len(result['qsd']) == 4
assert 'a' in result['qsd']
@ -59,6 +61,7 @@ def test_parse_qsd():
assert len(result['qsd-']) == 0
assert len(result['qsd+']) == 0
assert len(result['qsd:']) == 0
def test_parse_url():
@ -77,6 +80,7 @@ def test_parse_url():
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url('http://hostname/')
assert result['schema'] == 'http'
@ -91,6 +95,7 @@ def test_parse_url():
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# colon after hostname without port number is no good
assert utils.parse_url('http://hostname:') is None
@ -109,6 +114,7 @@ def test_parse_url():
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# A port of Zero is not valid
assert utils.parse_url('http://hostname:0') is None
@ -127,6 +133,7 @@ def test_parse_url():
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url('http://hostname/?-KeY=Value')
assert result['schema'] == 'http'
@ -143,6 +150,7 @@ def test_parse_url():
assert 'KeY' in result['qsd-']
assert unquote(result['qsd-']['KeY']) == 'Value'
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url('http://hostname/?+KeY=Value')
assert result['schema'] == 'http'
@ -158,9 +166,26 @@ def test_parse_url():
assert 'KeY' in result['qsd+']
assert result['qsd+']['KeY'] == 'Value'
assert result['qsd-'] == {}
assert result['qsd:'] == {}
result = utils.parse_url('http://hostname/?:kEy=vALUE')
assert result['schema'] == 'http'
assert result['host'] == 'hostname'
assert result['port'] is None
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] == '/'
assert result['path'] == '/'
assert result['query'] is None
assert result['url'] == 'http://hostname/'
assert ':key' in result['qsd']
assert 'kEy' in result['qsd:']
assert result['qsd:']['kEy'] == 'vALUE'
assert result['qsd+'] == {}
assert result['qsd-'] == {}
result = utils.parse_url(
'http://hostname/?+KeY=ValueA&-kEy=ValueB&KEY=Value%20+C')
'http://hostname/?+KeY=ValueA&-kEy=ValueB&KEY=Value%20+C&:colon=y')
assert result['schema'] == 'http'
assert result['host'] == 'hostname'
assert result['port'] is None
@ -172,6 +197,8 @@ def test_parse_url():
assert result['url'] == 'http://hostname/'
assert '+key' in result['qsd']
assert '-key' in result['qsd']
assert ':colon' in result['qsd']
assert result['qsd:']['colon'] == 'y'
assert 'key' in result['qsd']
assert 'KeY' in result['qsd+']
assert result['qsd+']['KeY'] == 'ValueA'
@ -194,6 +221,7 @@ def test_parse_url():
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url('http://hostname:40////')
assert result['schema'] == 'http'
@ -208,6 +236,7 @@ def test_parse_url():
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url('HTTP://HoStNaMe:40/test.php')
assert result['schema'] == 'http'
@ -222,6 +251,7 @@ def test_parse_url():
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url('HTTPS://user@hostname/test.py')
assert result['schema'] == 'https'
@ -236,6 +266,7 @@ def test_parse_url():
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url(' HTTPS://///user@@@hostname///test.py ')
assert result['schema'] == 'https'
@ -250,6 +281,7 @@ def test_parse_url():
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url(
'HTTPS://user:password@otherHost/full///path/name/',
@ -266,6 +298,7 @@ def test_parse_url():
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# Handle garbage
assert utils.parse_url(None) is None
@ -293,6 +326,7 @@ def test_parse_url():
assert unquote(result['qsd']['format']) == 'text'
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# Test Passwords with question marks ?; not supported
result = utils.parse_url(
@ -316,6 +350,7 @@ def test_parse_url():
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# just host and path
result = utils.parse_url('invalid/host')
@ -331,6 +366,7 @@ def test_parse_url():
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# just all out invalid
assert utils.parse_url('?') is None
@ -362,6 +398,7 @@ def test_parse_url():
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url('testhostname')
assert result['schema'] == 'http'
@ -376,6 +413,8 @@ def test_parse_url():
assert result['url'] == 'http://testhostname'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url('example.com', default_schema='unknown')
assert result['schema'] == 'unknown'
@ -390,6 +429,8 @@ def test_parse_url():
assert result['url'] == 'unknown://example.com'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# An empty string without a hostame is still valid if verify_host is set
result = utils.parse_url('', verify_host=False)
@ -405,6 +446,8 @@ def test_parse_url():
assert result['url'] == 'http://'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# A messed up URL
result = utils.parse_url('test://:@/', verify_host=False)
@ -419,6 +462,8 @@ def test_parse_url():
assert result['url'] == 'test://:@/'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url('crazy://:@//_/@^&/jack.json', verify_host=False)
assert result['schema'] == 'crazy'
@ -432,6 +477,8 @@ def test_parse_url():
assert unquote(result['url']) == 'crazy://:@/_/@^&/jack.json'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
def test_parse_bool():
@ -1159,3 +1206,61 @@ def test_environ_temporary_change():
# Even our temporary variables are now missing
assert n_key not in os.environ
assert d_key not in os.environ
def test_apply_templating():
"""utils: apply_template() testing
"""
template = "Hello {{fname}}, How are you {{whence}}?"
result = utils.apply_template(
template, **{'fname': 'Chris', 'whence': 'this morning'})
assert isinstance(result, six.string_types) is True
assert result == "Hello Chris, How are you this morning?"
# In this example 'whence' isn't provided, so it isn't swapped
result = utils.apply_template(
template, **{'fname': 'Chris'})
assert isinstance(result, six.string_types) is True
assert result == "Hello Chris, How are you {{whence}}?"
# white space won't cause any ill affects:
template = "Hello {{ fname }}, How are you {{ whence}}?"
result = utils.apply_template(
template, **{'fname': 'Chris', 'whence': 'this morning'})
assert isinstance(result, six.string_types) is True
assert result == "Hello Chris, How are you this morning?"
# No arguments won't cause any problems
template = "Hello {{fname}}, How are you {{whence}}?"
result = utils.apply_template(template)
assert isinstance(result, six.string_types) is True
assert result == template
# Wrong elements are simply ignored
result = utils.apply_template(
template,
**{'fname': 'l2g', 'whence': 'this evening', 'ignore': 'me'})
assert isinstance(result, six.string_types) is True
assert result == "Hello l2g, How are you this evening?"
# Empty template makes things easy
result = utils.apply_template(
"", **{'fname': 'l2g', 'whence': 'this evening'})
assert isinstance(result, six.string_types) is True
assert result == ""
# Regular expressions are safely escapped and act as normal
# tokens:
template = "Hello {{.*}}, How are you {{[A-Z0-9]+}}?"
result = utils.apply_template(
template, **{'.*': 'l2g', '[A-Z0-9]+': 'this afternoon'})
assert result == "Hello l2g, How are you this afternoon?"
# JSON is handled too such as escaping quotes
template = '{value: "{{ value }}"}'
result = utils.apply_template(
template, app_mode=utils.TemplateType.JSON,
**{'value': '"quotes are escaped"'})
assert result == '{value: "\\"quotes are escaped\\""}'