Merge pull request #173 from caronc/add-attachments

File Attachment Support Added to Apprise; refs #172
pull/175/head
Chris Caron 2019-11-16 22:22:06 -05:00 committed by GitHub
commit e5c0334b90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 3346 additions and 334 deletions

View File

@ -10,7 +10,7 @@ To inform or tell (someone). To make one aware of something.
* One notification library to rule them all.
* A common and intuitive notification syntax.
* Supports the handling of images (to the notification services that will accept them).
* Supports the handling of images and attachments (to the notification services that will accept them).
System owners who wish to provide a notification service no longer need to research each and every new one as they appear. They just need to include this one library and then they can immediately gain access to almost all of the notifications services available to us today.
@ -153,6 +153,23 @@ apprise -t 'my title' -b 'my notification body' \
--config=https://localhost/my/apprise/config
```
### Attaching Files
Apprise also supports file attachments too! Specify as many attachments to a notification as you want.
```bash
# Send a funny image you found on the internet to a colleage:
apprise --title 'Agile Joke' \
--body 'Did you see this one yet?' \
--attach https://i.redd.it/my2t4d2fx0u31.jpg \
'mailto://myemail:mypass@gmail.com'
# Easily send an update from a critical server to your dev team
apprise --title 'system crash' \
--body 'I do not think Jim fixed the bug; see attached...' \
--attach /var/log/myprogram.log \
--attach /var/debug/core.2345 \
--tag devteam
```
## Developers
To send a notification from within your python application, just do the following:
```python
@ -228,4 +245,56 @@ apobj.notify(
)
```
### Attaching Files
Attachments are very easy to send using the Apprise API:
```python
import apprise
# Create an Apprise instance
apobj = apprise.Apprise()
# Add at least one service you want to notify
apobj.add('mailto://myuser:mypass@hotmail.com')
# Then send your attachment.
apobj.notify(
title='A great photo of our family',
body='The flash caused Jane to close her eyes! hah! :)',
attach='/local/path/to/my/DSC_003.jpg',
)
# Send a web based attachment too! In the below example, we connect to a home
# security camera and send a live image to an email. By default remote web
# content is cached but for a security camera, we might want to call notify
# again later in our code so we want our last image retrieved to expire(in
# this case after 3 seconds).
apobj.notify(
title='Latest security image',
attach='http:/admin:password@hikvision-cam01/ISAPI/Streaming/channels/101/picture?cache=3'
)
# To send more than one attachment, you just need another object:
from apprise import AppriseAttachment
# Initialize our attachment object
aa = AppriseAttachment()
# Now add all of the entries we're intrested in:
# ?name= allows us to rename the actual jpeg as found on the site
# to be another name when sent to our receipient(s)
aa.add('https://i.redd.it/my2t4d2fx0u31.jpg?name=FlyingToMars.jpg')
# Now add another:
aa.add('/path/to/funny/joke.gif')
# Send your multiple attachments with a single notify call:
apobj.notify(
title='Some good jokes.'
body='Hey guys, check out these!'
attach=aa,
tag=friends
)
```
## Want To Learn More?
If you're interested in reading more about this and other methods on how to customize your own notifications, please check out the wiki at https://github.com/caronc/apprise/wiki/Development_API. You can also find more examples on how to use the command line tool at: https://github.com/caronc/apprise/wiki/CLI_Usage.

View File

@ -38,6 +38,7 @@ from .logger import logger
from .AppriseAsset import AppriseAsset
from .AppriseConfig import AppriseConfig
from .AppriseAttachment import AppriseAttachment
from .AppriseLocale import AppriseLocale
from .config.ConfigBase import ConfigBase
from .plugins.NotifyBase import NotifyBase
@ -277,7 +278,7 @@ class Apprise(object):
return
def notify(self, body, title='', notify_type=NotifyType.INFO,
body_format=None, tag=MATCH_ALL_TAG):
body_format=None, tag=MATCH_ALL_TAG, attach=None):
"""
Send a notification to all of the plugins previously loaded.
@ -293,6 +294,10 @@ class Apprise(object):
sent, False if even just one of them fails, and None if no
notifications were sent at all as a result of tag filtering and/or
simply having empty configuration files that were read.
Attach can contain a list of attachment URLs. attach can also be
represented by a an AttachBase() (or list of) object(s). This
identifies the products you wish to notify
"""
if len(self) == 0:
@ -309,6 +314,15 @@ class Apprise(object):
# Tracks conversions
conversion_map = dict()
# Prepare attachments if required
if attach is not None and not isinstance(attach, AppriseAttachment):
try:
attach = AppriseAttachment(attach, asset=self.asset)
except TypeError:
# bad attachments
return False
# Iterate over our loaded plugins
for server in self.find(tag):
if status is None:
@ -371,7 +385,8 @@ class Apprise(object):
if not server.notify(
body=conversion_map[server.notify_format],
title=title,
notify_type=notify_type):
notify_type=notify_type,
attach=attach):
# Toggle our return status flag
status = False

View File

@ -0,0 +1,270 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import six
from . import attachment
from . import URLBase
from .AppriseAsset import AppriseAsset
from .logger import logger
from .utils import GET_SCHEMA_RE
class AppriseAttachment(object):
"""
Our Apprise Attachment File Manager
"""
def __init__(self, paths=None, asset=None, cache=True, **kwargs):
"""
Loads all of the paths/urls specified (if any).
The path can either be a single string identifying one explicit
location, otherwise you can pass in a series of locations to scan
via a list.
By default we cache our responses so that subsiquent calls does not
cause the content to be retrieved again. For local file references
this makes no difference at all. But for remote content, this does
mean more then one call can be made to retrieve the (same) data. This
method can be somewhat inefficient if disabled. Only disable caching
if you understand the consequences.
You can alternatively set the cache value to an int identifying the
number of seconds the previously retrieved can exist for before it
should be considered expired.
It's also worth nothing that the cache value is only set to elements
that are not already of subclass AttachBase()
"""
# Initialize our attachment listings
self.attachments = list()
# Set our cache flag
self.cache = cache
# Prepare our Asset Object
self.asset = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
# Now parse any paths specified
if paths is not None:
# Store our path(s)
if not self.add(paths):
# Parse Source domain based on from_addr
raise TypeError("One or more attachments could not be added.")
def add(self, attachments, asset=None, cache=None):
"""
Adds one or more attachments into our list.
By default we cache our responses so that subsiquent calls does not
cause the content to be retrieved again. For local file references
this makes no difference at all. But for remote content, this does
mean more then one call can be made to retrieve the (same) data. This
method can be somewhat inefficient if disabled. Only disable caching
if you understand the consequences.
You can alternatively set the cache value to an int identifying the
number of seconds the previously retrieved can exist for before it
should be considered expired.
It's also worth nothing that the cache value is only set to elements
that are not already of subclass AttachBase()
"""
# Initialize our return status
return_status = True
# Initialize our default cache value
cache = cache if cache is not None else self.cache
if isinstance(asset, AppriseAsset):
# prepare default asset
asset = self.asset
if isinstance(attachments, attachment.AttachBase):
# Go ahead and just add our attachments into our list
self.attachments.append(attachments)
return True
elif isinstance(attachments, six.string_types):
# Save our path
attachments = (attachments, )
elif not isinstance(attachments, (tuple, set, list)):
logger.error(
'An invalid attachment url (type={}) was '
'specified.'.format(type(attachments)))
return False
# Iterate over our attachments
for _attachment in attachments:
if isinstance(_attachment, attachment.AttachBase):
# Go ahead and just add our attachment into our list
self.attachments.append(_attachment)
continue
elif not isinstance(_attachment, six.string_types):
logger.warning(
"An invalid attachment (type={}) was specified.".format(
type(_attachment)))
return_status = False
continue
logger.debug("Loading attachment: {}".format(_attachment))
# Instantiate ourselves an object, this function throws or
# returns None if it fails
instance = AppriseAttachment.instantiate(
_attachment, asset=asset, cache=cache)
if not isinstance(instance, attachment.AttachBase):
return_status = False
continue
# Add our initialized plugin to our server listings
self.attachments.append(instance)
# Return our status
return return_status
@staticmethod
def instantiate(url, asset=None, cache=None, suppress_exceptions=True):
"""
Returns the instance of a instantiated attachment plugin based on
the provided Attachment URL. If the url fails to be parsed, then None
is returned.
A specified cache value will over-ride anything set
"""
# Attempt to acquire the schema at the very least to allow our
# attachment based urls.
schema = GET_SCHEMA_RE.match(url)
if schema is None:
# Plan B is to assume we're dealing with a file
schema = attachment.AttachFile.protocol
url = '{}://{}'.format(schema, URLBase.quote(url))
else:
# Ensure our schema is always in lower case
schema = schema.group('schema').lower()
# Some basic validation
if schema not in attachment.SCHEMA_MAP:
logger.warning('Unsupported schema {}.'.format(schema))
return None
# Parse our url details of the server object as dictionary containing
# all of the information parsed from our URL
results = attachment.SCHEMA_MAP[schema].parse_url(url)
if not results:
# Failed to parse the server URL
logger.warning('Unparseable URL {}.'.format(url))
return None
# Prepare our Asset Object
results['asset'] = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
if cache is not None:
# Force an over-ride of the cache value to what we have specified
results['cache'] = cache
if suppress_exceptions:
try:
# Attempt to create an instance of our plugin using the parsed
# URL information
attach_plugin = \
attachment.SCHEMA_MAP[results['schema']](**results)
except Exception:
# the arguments are invalid or can not be used.
logger.warning('Could not load URL: %s' % url)
return None
else:
# Attempt to create an instance of our plugin using the parsed
# URL information but don't wrap it in a try catch
attach_plugin = attachment.SCHEMA_MAP[results['schema']](**results)
return attach_plugin
def clear(self):
"""
Empties our attachment list
"""
self.attachments[:] = []
def size(self):
"""
Returns the total size of accumulated attachments
"""
return sum([len(a) for a in self.attachments if len(a) > 0])
def pop(self, index=-1):
"""
Removes an indexed Apprise Attachment from the stack and returns it.
by default the last element is poped from the list
"""
# Remove our entry
return self.attachments.pop(index)
def __getitem__(self, index):
"""
Returns the indexed entry of a loaded apprise attachments
"""
return self.attachments[index]
def __bool__(self):
"""
Allows the Apprise object to be wrapped in an Python 3.x based 'if
statement'. True is returned if at least one service has been loaded.
"""
return True if self.attachments else False
def __nonzero__(self):
"""
Allows the Apprise object to be wrapped in an Python 2.x based 'if
statement'. True is returned if at least one service has been loaded.
"""
return True if self.attachments else False
def __iter__(self):
"""
Returns an iterator to our attachment list
"""
return iter(self.attachments)
def __len__(self):
"""
Returns the number of attachment entries loaded
"""
return len(self.attachments)

View File

@ -46,10 +46,12 @@ from .URLBase import URLBase
from .URLBase import PrivacyMode
from .plugins.NotifyBase import NotifyBase
from .config.ConfigBase import ConfigBase
from .attachment.AttachBase import AttachBase
from .Apprise import Apprise
from .AppriseAsset import AppriseAsset
from .AppriseConfig import AppriseConfig
from .AppriseAttachment import AppriseAttachment
# Set default logging handler to avoid "No handler found" warnings.
import logging
@ -58,8 +60,8 @@ logging.getLogger(__name__).addHandler(NullHandler())
__all__ = [
# Core
'Apprise', 'AppriseAsset', 'AppriseConfig', 'URLBase', 'NotifyBase',
'ConfigBase',
'Apprise', 'AppriseAsset', 'AppriseConfig', 'AppriseAttachment', 'URLBase',
'NotifyBase', 'ConfigBase', 'AttachBase',
# Reference
'NotifyType', 'NotifyImageSize', 'NotifyFormat', 'OverflowMode',

View File

@ -0,0 +1,330 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import os
import time
import mimetypes
from ..URLBase import URLBase
from ..utils import parse_bool
class AttachBase(URLBase):
"""
This is the base class for all supported attachment types
"""
# For attachment type detection; this amount of data is read into memory
# 128KB (131072B)
max_detect_buffer_size = 131072
# Unknown mimetype
unknown_mimetype = 'application/octet-stream'
# Our filename when we can't otherwise determine one
unknown_filename = 'apprise-attachment'
# Our filename extension when we can't otherwise determine one
unknown_filename_extension = '.obj'
# The strict argument is a flag specifying whether the list of known MIME
# types is limited to only the official types registered with IANA. When
# strict is True, only the IANA types are supported; when strict is False
# (the default), some additional non-standard but commonly used MIME types
# are also recognized.
strict = False
# The maximum file-size we will accept for an attachment size. If this is
# set to zero (0), then no check is performed
# 1 MB = 1048576 bytes
# 5 MB = 5242880 bytes
max_file_size = 5242880
def __init__(self, name=None, mimetype=None, cache=True, **kwargs):
"""
Initialize some general logging and common server arguments that will
keep things consistent when working with the configurations that
inherit this class.
Optionally provide a filename to over-ride name associated with the
actual file retrieved (from where-ever).
The mime-type is automatically detected, but you can over-ride this by
explicitly stating what it should be.
By default we cache our responses so that subsiquent calls does not
cause the content to be retrieved again. For local file references
this makes no difference at all. But for remote content, this does
mean more then one call can be made to retrieve the (same) data. This
method can be somewhat inefficient if disabled. Only disable caching
if you understand the consequences.
You can alternatively set the cache value to an int identifying the
number of seconds the previously retrieved can exist for before it
should be considered expired.
"""
super(AttachBase, self).__init__(**kwargs)
if not mimetypes.inited:
# Ensure mimetypes has been initialized
mimetypes.init()
# Attach Filename (does not have to be the same as path)
self._name = name
# The mime type of the attached content. This is detected if not
# otherwise specified.
self._mimetype = mimetype
# The detected_mimetype, this is only used as a fallback if the
# mimetype wasn't forced by the user
self.detected_mimetype = None
# The detected filename by calling child class. A detected filename
# is always used if no force naming was specified.
self.detected_name = None
# Absolute path to attachment
self.download_path = None
# Set our cache flag
# it can be True, or an integer
try:
self.cache = cache if isinstance(cache, bool) else int(cache)
if self.cache < 0:
raise ValueError()
except (ValueError, TypeError):
err = 'An invalid cache value ({}) was specified.'.format(cache)
self.logger.warning(err)
raise TypeError(err)
# Validate mimetype if specified
if self._mimetype:
if next((t for t in mimetypes.types_map.values()
if self._mimetype == t), None) is None:
err = 'An invalid mime-type ({}) was specified.'.format(
mimetype)
self.logger.warning(err)
raise TypeError(err)
return
@property
def path(self):
"""
Returns the absolute path to the filename. If this is not known or
is know but has been considered expired (due to cache setting), then
content is re-retrieved prior to returning.
"""
if not self.exists():
# we could not obtain our path
return None
return self.download_path
@property
def name(self):
"""
Returns the filename
"""
if self._name:
# return our fixed content
return self._name
if not self.exists():
# we could not obtain our name
return None
if not self.detected_name:
# If we get here, our download was successful but we don't have a
# filename based on our content.
extension = mimetypes.guess_extension(self.mimetype)
self.detected_name = '{}{}'.format(
self.unknown_filename,
extension if extension else self.unknown_filename_extension)
return self.detected_name
@property
def mimetype(self):
"""
Returns mime type (if one is present).
Content is cached once determied to prevent overhead of future
calls.
"""
if self._mimetype:
# return our pre-calculated cached content
return self._mimetype
if not self.exists():
# we could not obtain our attachment
return None
if not self.detected_mimetype:
# guess_type() returns: (type, encoding) and sets type to None
# if it can't otherwise determine it.
try:
# Directly reference _name and detected_name to prevent
# recursion loop (as self.name calls this function)
self.detected_mimetype, _ = mimetypes.guess_type(
self._name if self._name
else self.detected_name, strict=self.strict)
except TypeError:
# Thrown if None was specified in filename section
pass
# Return our mime type
return self.detected_mimetype \
if self.detected_mimetype else self.unknown_mimetype
def exists(self):
"""
Simply returns true if the object has downloaded and stored the
attachment AND the attachment has not expired.
"""
if self.download_path and os.path.isfile(self.download_path) \
and self.cache:
# We have enough reason to look further into our cached value
if self.cache is True:
# return our fixed content as is; we will always cache it
return True
# Verify our cache time to determine whether we will get our
# content again.
try:
age_in_sec = time.time() - os.stat(self.download_path).st_mtime
if age_in_sec <= self.cache:
return True
except (OSError, IOError):
# The file is not present
pass
return self.download()
def invalidate(self):
"""
Release any temporary data that may be open by child classes.
Externally fetched content should be automatically cleaned up when
this function is called.
This function should also reset the following entries to None:
- detected_name : Should identify a human readable filename
- download_path: Must contain a absolute path to content
- detected_mimetype: Should identify mimetype of content
"""
self.detected_name = None
self.download_path = None
self.detected_mimetype = None
return
def download(self):
"""
This function must be over-ridden by inheriting classes.
Inherited classes MUST populate:
- detected_name: Should identify a human readable filename
- download_path: Must contain a absolute path to content
- detected_mimetype: Should identify mimetype of content
If a download fails, you should ensure these values are set to None.
"""
raise NotImplementedError(
"download() is implimented by the child class.")
@staticmethod
def parse_url(url, verify_host=True, mimetype_db=None):
"""Parses the URL and returns it broken apart into a dictionary.
This is very specific and customized for Apprise.
Args:
url (str): The URL you want to fully parse.
verify_host (:obj:`bool`, optional): a flag kept with the parsed
URL which some child classes will later use to verify SSL
keys (if SSL transactions take place). Unless under very
specific circumstances, it is strongly recomended that
you leave this default value set to True.
Returns:
A dictionary is returned containing the URL fully parsed if
successful, otherwise None is returned.
"""
results = URLBase.parse_url(url, verify_host=verify_host)
if not results:
# We're done; we failed to parse our url
return results
# Allow overriding the default config mime type
if 'mime' in results['qsd']:
results['mimetype'] = results['qsd'].get('mime', '') \
.strip().lower()
# Allow overriding the default file name
if 'name' in results['qsd']:
results['name'] = results['qsd'].get('name', '') \
.strip().lower()
# Our cache value
if 'cache' in results['qsd']:
# First try to get it's integer value
try:
results['cache'] = int(results['qsd']['cache'])
except (ValueError, TypeError):
# No problem, it just isn't an integer; now treat it as a bool
# instead:
results['cache'] = parse_bool(results['qsd']['cache'])
return results
def __len__(self):
"""
Returns the filesize of the attachment.
"""
return os.path.getsize(self.path) if self.path else 0
def __bool__(self):
"""
Allows the Apprise object to be wrapped in an Python 3.x based 'if
statement'. True is returned if our content was downloaded correctly.
"""
return True if self.path else False
def __nonzero__(self):
"""
Allows the Apprise object to be wrapped in an Python 2.x based 'if
statement'. True is returned if our content was downloaded correctly.
"""
return True if self.path else False

View File

@ -0,0 +1,129 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
import os
from .AttachBase import AttachBase
from ..AppriseLocale import gettext_lazy as _
class AttachFile(AttachBase):
"""
A wrapper for File based attachment sources
"""
# The default descriptive name associated with the service
service_name = _('Local File')
# The default protocol
protocol = 'file'
def __init__(self, path, **kwargs):
"""
Initialize Local File Attachment Object
"""
super(AttachFile, self).__init__(**kwargs)
# Store path but mark it dirty since we have not performed any
# verification at this point.
self.dirty_path = os.path.expanduser(path)
return
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {}
if self._mimetype:
# A mime-type was enforced
args['mime'] = self._mimetype
if self._name:
# A name was enforced
args['name'] = self._name
return 'file://{path}{args}'.format(
path=self.quote(self.dirty_path),
args='?{}'.format(self.urlencode(args)) if args else '',
)
def download(self, **kwargs):
"""
Perform retrieval of our data.
For file base attachments, our data already exists, so we only need to
validate it.
"""
# Ensure any existing content set has been invalidated
self.invalidate()
if not os.path.isfile(self.dirty_path):
return False
if self.max_file_size > 0 and \
os.path.getsize(self.dirty_path) > self.max_file_size:
# The content to attach is to large
self.logger.error(
'Content exceeds allowable maximum file length '
'({}KB): {}'.format(
int(self.max_file_size / 1024), self.url(privacy=True)))
# Return False (signifying a failure)
return False
# We're good to go if we get here. Set our minimum requirements of
# a call do download() before returning a success
self.download_path = self.dirty_path
self.detected_name = os.path.basename(self.download_path)
# We don't need to set our self.detected_mimetype as it can be
# pulled at the time it's needed based on the detected_name
return True
@staticmethod
def parse_url(url):
"""
Parses the URL so that we can handle all different file paths
and return it as our path object
"""
results = AttachBase.parse_url(url, verify_host=False)
if not results:
# We're done early; it's not a good URL
return results
match = re.match(r'file://(?P<path>[^?]+)(\?.*)?', url, re.I)
if not match:
return None
results['path'] = AttachFile.unquote(match.group('path'))
return results

View File

@ -0,0 +1,321 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
import os
import six
import requests
from tempfile import NamedTemporaryFile
from .AttachBase import AttachBase
from ..URLBase import PrivacyMode
from ..AppriseLocale import gettext_lazy as _
class AttachHTTP(AttachBase):
"""
A wrapper for HTTP based attachment sources
"""
# The default descriptive name associated with the service
service_name = _('Web Based')
# The default protocol
protocol = 'http'
# The default secure protocol
secure_protocol = 'https'
# The maximum number of seconds to wait for a connection to be established
# before out-right just giving up
connection_timeout_sec = 5.0
# The number of bytes in memory to read from the remote source at a time
chunk_size = 8192
def __init__(self, headers=None, **kwargs):
"""
Initialize HTTP Object
headers can be a dictionary of key/value pairs that you want to
additionally include as part of the server headers to post with
"""
super(AttachHTTP, self).__init__(**kwargs)
self.schema = 'https' if self.secure else 'http'
self.fullpath = kwargs.get('fullpath')
if not isinstance(self.fullpath, six.string_types):
self.fullpath = '/'
self.headers = {}
if headers:
# Store our extra headers
self.headers.update(headers)
# Where our content is written to upon a call to download.
self._temp_file = None
return
def download(self, **kwargs):
"""
Perform retrieval of the configuration based on the specified request
"""
# Ensure any existing content set has been invalidated
self.invalidate()
# prepare header
headers = {
'User-Agent': self.app_id,
}
# Apply any/all header over-rides defined
headers.update(self.headers)
auth = None
if self.user:
auth = (self.user, self.password)
url = '%s://%s' % (self.schema, self.host)
if isinstance(self.port, int):
url += ':%d' % self.port
url += self.fullpath
self.logger.debug('HTTP POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
))
# Where our request object will temporarily live.
r = None
# Always call throttle before any remote server i/o is made
self.throttle()
try:
# Make our request
with requests.get(
url,
headers=headers,
auth=auth,
verify=self.verify_certificate,
timeout=self.connection_timeout_sec,
stream=True) as r:
# Handle Errors
r.raise_for_status()
# Get our file-size (if known)
try:
file_size = int(r.headers.get('Content-Length', '0'))
except (TypeError, ValueError):
# Handle edge case where Content-Length is a bad value
file_size = 0
# Perform a little Q/A on file limitations and restrictions
if self.max_file_size > 0 and file_size > self.max_file_size:
# The content retrieved is to large
self.logger.error(
'HTTP response exceeds allowable maximum file length '
'({}KB): {}'.format(
int(self.max_file_size / 1024),
self.url(privacy=True)))
# Return False (signifying a failure)
return False
# Detect config format based on mime if the format isn't
# already enforced
self.detected_mimetype = r.headers.get('Content-Type')
d = r.headers.get('Content-Disposition', '')
result = re.search(
"filename=['\"]?(?P<name>[^'\"]+)['\"]?", d, re.I)
if result:
self.detected_name = result.group('name').strip()
# Create a temporary file to work with
self._temp_file = NamedTemporaryFile()
# Get our chunk size
chunk_size = self.chunk_size
# Track all bytes written to disk
bytes_written = 0
# If we get here, we can now safely write our content to disk
for chunk in r.iter_content(chunk_size=chunk_size):
# filter out keep-alive chunks
if chunk:
self._temp_file.write(chunk)
bytes_written = self._temp_file.tell()
# Prevent a case where Content-Length isn't provided
# we don't want to fetch beyond our limits
if self.max_file_size > 0:
if bytes_written > self.max_file_size:
# The content retrieved is to large
self.logger.error(
'HTTP response exceeds allowable maximum '
'file length ({}KB): {}'.format(
int(self.max_file_size / 1024),
self.url(privacy=True)))
# Invalidate any variables previously set
self.invalidate()
# Return False (signifying a failure)
return False
elif bytes_written + chunk_size \
> self.max_file_size:
# Adjust out next read to accomodate up to our
# limit +1. This will prevent us from readig
# to much into our memory buffer
self.max_file_size - bytes_written + 1
# Ensure our content is flushed to disk for post-processing
self._temp_file.flush()
# Set our minimum requirements for a successful download() call
self.download_path = self._temp_file.name
if not self.detected_name:
self.detected_name = os.path.basename(self.fullpath)
except requests.RequestException as e:
self.logger.error(
'A Connection error occured retrieving HTTP '
'configuration from %s.' % self.host)
self.logger.debug('Socket Exception: %s' % str(e))
# Invalidate any variables previously set
self.invalidate()
# Return False (signifying a failure)
return False
except (IOError, OSError):
# IOError is present for backwards compatibility with Python
# versions older then 3.3. >= 3.3 throw OSError now.
# Could not open and/or write the temporary file
self.logger.error(
'Could not write attachment to disk: {}'.format(
self.url(privacy=True)))
# Invalidate any variables previously set
self.invalidate()
# Return False (signifying a failure)
return False
# Return our success
return True
def invalidate(self):
"""
Close our temporary file
"""
if self._temp_file:
self._temp_file.close()
self._temp_file = None
super(AttachHTTP, self).invalidate()
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Prepare our cache value
if isinstance(self.cache, bool) or not self.cache:
cache = 'yes' if self.cache else 'no'
else:
cache = int(self.cache)
# Define any arguments set
args = {
'verify': 'yes' if self.verify_certificate else 'no',
'cache': cache,
}
if self._mimetype:
# A format was enforced
args['mime'] = self._mimetype
if self._name:
# A name was enforced
args['name'] = self._name
# Append our headers into our args
args.update({'+{}'.format(k): v for k, v in self.headers.items()})
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=self.quote(self.user, safe=''),
password=self.pprint(
self.password, privacy, mode=PrivacyMode.Secret, safe=''),
)
elif self.user:
auth = '{user}@'.format(
user=self.quote(self.user, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}{fullpath}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.quote(self.host, safe=''),
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
fullpath=self.quote(self.fullpath, safe='/'),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = AttachBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Add our headers that the user can potentially over-ride if they wish
# to to our returned result set
results['headers'] = results['qsd-']
results['headers'].update(results['qsd+'])
return results

View File

@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import six
import re
from os import listdir
from os.path import dirname
from os.path import abspath
# Maintains a mapping of all of the attachment services
SCHEMA_MAP = {}
__all__ = []
# Load our Lookup Matrix
def __load_matrix(path=abspath(dirname(__file__)), name='apprise.attachment'):
"""
Dynamically load our schema map; this allows us to gracefully
skip over modules we simply don't have the dependencies for.
"""
# Used for the detection of additional Attachment Services objects
# The .py extension is optional as we support loading directories too
module_re = re.compile(r'^(?P<name>Attach[a-z0-9]+)(\.py)?$', re.I)
for f in listdir(path):
match = module_re.match(f)
if not match:
# keep going
continue
# Store our notification/plugin name:
plugin_name = match.group('name')
try:
module = __import__(
'{}.{}'.format(name, plugin_name),
globals(), locals(),
fromlist=[plugin_name])
except ImportError:
# No problem, we can't use this object
continue
if not hasattr(module, plugin_name):
# Not a library we can load as it doesn't follow the simple rule
# that the class must bear the same name as the notification
# file itself.
continue
# Get our plugin
plugin = getattr(module, plugin_name)
if not hasattr(plugin, 'app_id'):
# Filter out non-notification modules
continue
elif plugin_name in __all__:
# we're already handling this object
continue
# Add our module name to our __all__
__all__.append(plugin_name)
# Ensure we provide the class as the reference to this directory and
# not the module:
globals()[plugin_name] = plugin
# Load protocol(s) if defined
proto = getattr(plugin, 'protocol', None)
if isinstance(proto, six.string_types):
if proto not in SCHEMA_MAP:
SCHEMA_MAP[proto] = plugin
elif isinstance(proto, (set, list, tuple)):
# Support iterables list types
for p in proto:
if p not in SCHEMA_MAP:
SCHEMA_MAP[p] = plugin
# Load secure protocol(s) if defined
protos = getattr(plugin, 'secure_protocol', None)
if isinstance(protos, six.string_types):
if protos not in SCHEMA_MAP:
SCHEMA_MAP[protos] = plugin
if isinstance(protos, (set, list, tuple)):
# Support iterables list types
for p in protos:
if p not in SCHEMA_MAP:
SCHEMA_MAP[p] = plugin
return SCHEMA_MAP
# Dynamically build our schema base
__load_matrix()

View File

@ -99,6 +99,9 @@ def print_version_msg():
@click.option('--config', '-c', default=None, type=str, multiple=True,
metavar='CONFIG_URL',
help='Specify one or more configuration locations.')
@click.option('--attach', '-a', default=None, type=str, multiple=True,
metavar='ATTACHMENT_URL',
help='Specify one or more configuration locations.')
@click.option('--notification-type', '-n', default=NotifyType.INFO, type=str,
metavar='TYPE',
help='Specify the message type (default=info). Possible values'
@ -120,8 +123,8 @@ def print_version_msg():
help='Display the apprise version and exit.')
@click.argument('urls', nargs=-1,
metavar='SERVER_URL [SERVER_URL2 [SERVER_URL3]]',)
def main(body, title, config, urls, notification_type, theme, tag, dry_run,
verbose, version):
def main(body, title, config, attach, urls, notification_type, theme, tag,
dry_run, verbose, version):
"""
Send a notification to all of the specified servers identified by their
URLs the content provided within the title, body and notification-type.
@ -200,7 +203,8 @@ def main(body, title, config, urls, notification_type, theme, tag, dry_run,
# now print it out
result = a.notify(
body=body, title=title, notify_type=notification_type, tag=tags)
body=body, title=title, notify_type=notification_type, tag=tags,
attach=attach)
else:
# Number of rows to assume in the terminal. In future, maybe this can
# be detected and made dynamic. The actual row count is 80, but 5

View File

@ -33,6 +33,7 @@ from ..common import NOTIFY_FORMATS
from ..common import OverflowMode
from ..common import OVERFLOW_MODES
from ..AppriseLocale import gettext_lazy as _
from ..AppriseAttachment import AppriseAttachment
class NotifyBase(URLBase):
@ -241,12 +242,21 @@ class NotifyBase(URLBase):
)
def notify(self, body, title=None, notify_type=NotifyType.INFO,
overflow=None, **kwargs):
overflow=None, attach=None, **kwargs):
"""
Performs notification
"""
# Prepare attachments if required
if attach is not None and not isinstance(attach, AppriseAttachment):
try:
attach = AppriseAttachment(attach, asset=self.asset)
except TypeError:
# bad attachments
return False
# Handle situations where the title is None
title = '' if not title else title
@ -255,7 +265,7 @@ class NotifyBase(URLBase):
overflow=overflow):
# Send notification
if not self.send(body=chunk['body'], title=chunk['title'],
notify_type=notify_type):
notify_type=notify_type, attach=attach):
# Toggle our return status flag
return False

View File

@ -42,7 +42,6 @@
#
import re
import requests
from json import dumps
from .NotifyBase import NotifyBase
from ..common import NotifyImageSize
@ -178,17 +177,12 @@ class NotifyDiscord(NotifyBase):
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
**kwargs):
"""
Perform Discord Notification
"""
headers = {
'User-Agent': self.app_id,
'Content-Type': 'multipart/form-data',
}
# Prepare JSON Object
payload = {
# Text-To-Speech
'tts': self.tts,
@ -258,6 +252,50 @@ class NotifyDiscord(NotifyBase):
# Optionally override the default username of the webhook
payload['username'] = self.user
if not self._send(payload):
# We failed to post our message
return False
if attach:
# Update our payload; the idea is to preserve it's other detected
# and assigned values for re-use here too
payload.update({
# Text-To-Speech
'tts': False,
# Wait until the upload has posted itself before continuing
'wait': True,
})
# Remove our text/title based content for attachment use
if 'embeds' in payload:
# Markdown
del payload['embeds']
if 'content' in payload:
# Markdown
del payload['content']
# Send our attachments
for attachment in attach:
self.logger.info(
'Posting Discord Attachment {}'.format(attachment.name))
if not self._send(payload, attach=attachment):
# We failed to post our message
return False
# Otherwise return
return True
def _send(self, payload, attach=None, **kwargs):
"""
Wrapper to the requests (post) object
"""
# Our headers
headers = {
'User-Agent': self.app_id,
}
# Construct Notify URL
notify_url = '{0}/{1}/{2}'.format(
self.notify_url,
@ -273,11 +311,19 @@ class NotifyDiscord(NotifyBase):
# Always call throttle before any remote server i/o is made
self.throttle()
# Our attachment path (if specified)
files = None
try:
# Open our attachment path if required:
if attach:
files = (attach.name, open(attach.path, 'rb'))
r = requests.post(
notify_url,
data=dumps(payload),
data=payload,
headers=headers,
files=files,
verify=self.verify_certificate,
)
if r.status_code not in (
@ -288,8 +334,9 @@ class NotifyDiscord(NotifyBase):
NotifyBase.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send Discord notification: '
'Failed to send {}to Discord notification: '
'{}{}error={}.'.format(
attach.name if attach else '',
status_str,
', ' if status_str else '',
r.status_code))
@ -304,12 +351,24 @@ class NotifyDiscord(NotifyBase):
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Discord '
'notification.'
)
'A Connection error occured posting {}to Discord.'.format(
attach.name if attach else ''))
self.logger.debug('Socket Exception: %s' % str(e))
return False
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occured while reading {}.'.format(
attach.name if attach else 'attachment'))
self.logger.debug('I/O Exception: %s' % str(e))
return False
finally:
# Close our file (if it's open) stored in the second element
# of our files tuple (index 1)
if files:
files[1].close()
return True
def url(self, privacy=False, *args, **kwargs):

View File

@ -27,6 +27,9 @@ import re
import six
import smtplib
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from socket import error as SocketError
from datetime import datetime
@ -509,7 +512,8 @@ class NotifyEmail(NotifyBase):
break
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
**kwargs):
"""
Perform Email Notification
"""
@ -551,18 +555,55 @@ class NotifyEmail(NotifyBase):
# Prepare Email Message
if self.notify_format == NotifyFormat.HTML:
email = MIMEText(body, 'html')
content = MIMEText(body, 'html')
else:
email = MIMEText(body, 'plain')
content = MIMEText(body, 'plain')
email['Subject'] = title
email['From'] = '{} <{}>'.format(from_name, self.from_addr)
email['To'] = to_addr
email['Cc'] = ','.join(cc)
email['Date'] = \
base = MIMEMultipart() if attach else content
base['Subject'] = title
base['From'] = '{} <{}>'.format(from_name, self.from_addr)
base['To'] = to_addr
base['Cc'] = ','.join(cc)
base['Date'] = \
datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S +0000")
email['X-Application'] = self.app_id
base['X-Application'] = self.app_id
if attach:
# First attach our body to our content as the first element
base.attach(content)
attach_error = False
# Now store our attachments
for attachment in attach:
if not attachment:
# We could not load the attachment; take an early
# exit since this isn't what the end user wanted
self.logger.warning(
'The specified attachment could not be referenced:'
' {}.'.format(attachment.url(privacy=True)))
# Mark our failure
attach_error = True
break
with open(attachment.path, "rb") as abody:
app = MIMEApplication(
abody.read(), attachment.mimetype)
app.add_header(
'Content-Disposition',
'attachment; filename="{}"'.format(
attachment.name))
base.attach(app)
if attach_error:
# Mark our error and quit early
has_error = True
break
# bind the socket variable to the current namespace
socket = None
@ -598,7 +639,7 @@ class NotifyEmail(NotifyBase):
socket.sendmail(
self.from_addr,
[to_addr] + list(cc) + list(bcc),
email.as_string())
base.as_string())
self.logger.info(
'Sent Email notification to "{}".'.format(to_addr))

View File

@ -23,21 +23,41 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# To use this plugin, you need to first access https://api.slack.com
# Specifically https://my.slack.com/services/new/incoming-webhook/
# to create a new incoming webhook for your account. You'll need to
# follow the wizard to pre-determine the channel(s) you want your
# message to broadcast to, and when you're complete, you will
# recieve a URL that looks something like this:
# https://hooks.slack.com/services/T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ
# ^ ^ ^
# | | |
# These are important <--------------^---------^---------------^
# There are 2 ways to use this plugin...
# Method 1: Via Webhook:
# Visit https://my.slack.com/services/new/incoming-webhook/
# to create a new incoming webhook for your account. You'll need to
# follow the wizard to pre-determine the channel(s) you want your
# message to broadcast to, and when you're complete, you will
# recieve a URL that looks something like this:
# https://hooks.slack.com/services/T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7
# ^ ^ ^
# | | |
# These are important <--------------^---------^---------------^
#
# Method 2: Via a Bot:
# 1. visit: https://api.slack.com/apps?new_app=1
# 2. Pick an App Name (such as Apprise) and select your workspace. Then
# press 'Create App'
# 3. You'll be able to click on 'Bots' from here where you can then choose
# to add a 'Bot User'. Give it a name and choose 'Add Bot User'.
# 4. Now you can choose 'Install App' to which you can choose 'Install App
# to Workspace'.
# 5. You will need to authorize the app which you get promopted to do.
# 6. Finally you'll get some important information providing you your
# 'OAuth Access Token' and 'Bot User OAuth Access Token' such as:
# slack://{Oauth Access Token}
#
# ... which might look something like:
# slack://xoxp-1234-1234-1234-4ddbc191d40ee098cbaae6f3523ada2d
# ... or:
# slack://xoxb-1234-1234-4ddbc191d40ee098cbaae6f3523ada2d
#
import re
import requests
from json import dumps
from json import loads
from time import time
from .NotifyBase import NotifyBase
@ -58,6 +78,27 @@ SLACK_HTTP_ERROR_MAP = {
CHANNEL_LIST_DELIM = re.compile(r'[ \t\r\n,#\\/]+')
class SlackMode(object):
"""
Tracks the mode of which we're using Slack
"""
# We're dealing with a webhook
# Our token looks like: T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7
WEBHOOK = 'webhook'
# We're dealing with a bot (using the OAuth Access Token)
# Our token looks like: xoxp-1234-1234-1234-abc124 or
# Our token looks like: xoxb-1234-1234-abc124 or
BOT = 'bot'
# Define our Slack Modes
SLACK_MODES = (
SlackMode.WEBHOOK,
SlackMode.BOT,
)
class NotifySlack(NotifyBase):
"""
A wrapper for Slack Notifications
@ -72,27 +113,43 @@ class NotifySlack(NotifyBase):
# The default secure protocol
secure_protocol = 'slack'
# Allow 50 requests per minute (Tier 2).
# 60/50 = 0.2
request_rate_per_sec = 1.2
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_slack'
# Slack uses the http protocol with JSON requests
notify_url = 'https://hooks.slack.com/services'
# Slack Webhook URL
webhook_url = 'https://hooks.slack.com/services'
# Slack API URL (used with Bots)
api_url = 'https://slack.com/api/{}'
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_72
# The maximum allowable characters allowed in the body per message
body_maxlen = 1000
body_maxlen = 35000
# Default Notification Format
notify_format = NotifyFormat.MARKDOWN
# Bot's do not have default channels to notify; so #general
# becomes the default channel in BOT mode
default_notification_channel = '#general'
# Define object templates
templates = (
# Webhook
'{schema}://{token_a}/{token_b}{token_c}',
'{schema}://{botname}@{token_a}/{token_b}{token_c}',
'{schema}://{token_a}/{token_b}{token_c}/{targets}',
'{schema}://{botname}@{token_a}/{token_b}{token_c}/{targets}',
# Bot
'{schema}://{access_token}/',
'{schema}://{access_token}/{targets}',
)
# Define our template tokens
@ -102,7 +159,17 @@ class NotifySlack(NotifyBase):
'type': 'string',
'map_to': 'user',
},
# Token required as part of the API request
# Bot User OAuth Access Token
# which always starts with xoxp- e.g.:
# xoxb-1234-1234-4ddbc191d40ee098cbaae6f3523ada2d
'access_token': {
'name': _('OAuth Access Token'),
'type': 'string',
'private': True,
'required': True,
'regex': (r'^xox[abp]-[A-Z0-9-]+$', 'i'),
},
# Token required as part of the Webhook request
# /AAAAAAAAA/........./........................
'token_a': {
'name': _('Token A'),
@ -111,7 +178,7 @@ class NotifySlack(NotifyBase):
'required': True,
'regex': (r'^[A-Z0-9]{9}$', 'i'),
},
# Token required as part of the API request
# Token required as part of the Webhook request
# /........./BBBBBBBBB/........................
'token_b': {
'name': _('Token B'),
@ -120,7 +187,7 @@ class NotifySlack(NotifyBase):
'required': True,
'regex': (r'^[A-Z0-9]{9}$', 'i'),
},
# Token required as part of the API request
# Token required as part of the Webhook request
# /........./........./CCCCCCCCCCCCCCCCCCCCCCCC
'token_c': {
'name': _('Token C'),
@ -161,41 +228,60 @@ class NotifySlack(NotifyBase):
'default': True,
'map_to': 'include_image',
},
'footer': {
'name': _('Include Footer'),
'type': 'bool',
'default': True,
'map_to': 'include_footer',
},
'to': {
'alias_of': 'targets',
},
})
def __init__(self, token_a, token_b, token_c, targets,
include_image=True, **kwargs):
def __init__(self, access_token=None, token_a=None, token_b=None,
token_c=None, targets=None, include_image=True,
include_footer=True, **kwargs):
"""
Initialize Slack Object
"""
super(NotifySlack, self).__init__(**kwargs)
self.token_a = validate_regex(
token_a, *self.template_tokens['token_a']['regex'])
if not self.token_a:
msg = 'An invalid Slack (first) Token ' \
'({}) was specified.'.format(token_a)
self.logger.warning(msg)
raise TypeError(msg)
# Setup our mode
self.mode = SlackMode.BOT if access_token else SlackMode.WEBHOOK
self.token_b = validate_regex(
token_b, *self.template_tokens['token_b']['regex'])
if not self.token_b:
msg = 'An invalid Slack (second) Token ' \
'({}) was specified.'.format(token_b)
self.logger.warning(msg)
raise TypeError(msg)
if self.mode is SlackMode.WEBHOOK:
self.token_a = validate_regex(
token_a, *self.template_tokens['token_a']['regex'])
if not self.token_a:
msg = 'An invalid Slack (first) Token ' \
'({}) was specified.'.format(token_a)
self.logger.warning(msg)
raise TypeError(msg)
self.token_c = validate_regex(
token_c, *self.template_tokens['token_c']['regex'])
if not self.token_c:
msg = 'An invalid Slack (third) Token ' \
'({}) was specified.'.format(token_c)
self.logger.warning(msg)
raise TypeError(msg)
self.token_b = validate_regex(
token_b, *self.template_tokens['token_b']['regex'])
if not self.token_b:
msg = 'An invalid Slack (second) Token ' \
'({}) was specified.'.format(token_b)
self.logger.warning(msg)
raise TypeError(msg)
self.token_c = validate_regex(
token_c, *self.template_tokens['token_c']['regex'])
if not self.token_c:
msg = 'An invalid Slack (third) Token ' \
'({}) was specified.'.format(token_c)
self.logger.warning(msg)
raise TypeError(msg)
else:
self.access_token = validate_regex(
access_token, *self.template_tokens['access_token']['regex'])
if not self.access_token:
msg = 'An invalid Slack OAuth Access Token ' \
'({}) was specified.'.format(access_token)
self.logger.warning(msg)
raise TypeError(msg)
if not self.user:
self.logger.warning(
@ -207,7 +293,9 @@ class NotifySlack(NotifyBase):
# No problem; the webhook is smart enough to just notify the
# channel it was created for; adding 'None' is just used as
# a flag lower to not set the channels
self.channels.append(None)
self.channels.append(
None if self.mode is SlackMode.WEBHOOK
else self.default_notification_channel)
# Formatting requirements are defined here:
# https://api.slack.com/docs/message-formatting
@ -229,18 +317,16 @@ class NotifySlack(NotifyBase):
# Place a thumbnail image inline with the message body
self.include_image = include_image
# Place a footer with each post
self.include_footer = include_footer
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
**kwargs):
"""
Perform Slack Notification
"""
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/json',
}
# error tracking (used for function return)
has_error = False
@ -251,14 +337,8 @@ class NotifySlack(NotifyBase):
body = self._re_formatting_rules.sub( # pragma: no branch
lambda x: self._re_formatting_map[x.group()], body,
)
url = '%s/%s/%s/%s' % (
self.notify_url,
self.token_a,
self.token_b,
self.token_c,
)
# prepare JSON Object
# Prepare JSON Object (applicable to both WEBHOOK and BOT mode)
payload = {
'username': self.user if self.user else self.app_id,
# Use Markdown language
@ -269,18 +349,41 @@ class NotifySlack(NotifyBase):
'color': self.color(notify_type),
# Time
'ts': time(),
'footer': self.app_id,
}],
}
# Prepare our URL (depends on mode)
if self.mode is SlackMode.WEBHOOK:
url = '{}/{}/{}/{}'.format(
self.webhook_url,
self.token_a,
self.token_b,
self.token_c,
)
else: # SlackMode.BOT
url = self.api_url.format('chat.postMessage')
if self.include_footer:
# Include the footer only if specified to do so
payload['attachments'][0]['footer'] = self.app_id
if attach and self.mode is SlackMode.WEBHOOK:
# Be friendly; let the user know why they can't send their
# attachments if using the Webhook mode
self.logger.warning(
'Slack Webhooks do not support attachments.')
# Create a copy of the channel list
channels = list(self.channels)
attach_channel_list = []
while len(channels):
channel = channels.pop(0)
if channel is not None:
_channel = validate_regex(
channel, r'[+#@]?([A-Z0-9_]{1,32})')
channel, r'[+#@]?(?P<value>[A-Z0-9_]{1,32})')
if not _channel:
# Channel over-ride was specified
@ -304,70 +407,226 @@ class NotifySlack(NotifyBase):
# Prefix with channel hash tag
payload['channel'] = '#{}'.format(_channel)
# Store the valid and massaged payload that is recognizable by
# slack. This list is used for sending attachments later.
attach_channel_list.append(payload['channel'])
# Acquire our to-be footer icon if configured to do so
image_url = None if not self.include_image \
else self.image_url(notify_type)
if image_url:
payload['attachments'][0]['footer_icon'] = image_url
payload['icon_url'] = image_url
self.logger.debug('Slack POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
))
self.logger.debug('Slack Payload: %s' % str(payload))
if self.include_footer:
payload['attachments'][0]['footer_icon'] = image_url
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
data=dumps(payload),
headers=headers,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifySlack.http_response_code_lookup(
r.status_code, SLACK_HTTP_ERROR_MAP)
self.logger.warning(
'Failed to send Slack notification{}: '
'{}{}error={}.'.format(
' to {}'.format(channel)
if channel is not None else '',
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
has_error = True
continue
else:
self.logger.info(
'Sent Slack notification{}.'.format(
' to {}'.format(channel)
if channel is not None else ''))
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Slack '
'notification{}.'.format(
' to {}'.format(channel)
if channel is not None else ''))
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
response = self._send(url, payload)
if not response:
# Handle any error
has_error = True
continue
self.logger.info(
'Sent Slack notification{}.'.format(
' to {}'.format(channel)
if channel is not None else ''))
if attach and self.mode is SlackMode.BOT and attach_channel_list:
# Send our attachments (can only be done in bot mode)
for attachment in attach:
self.logger.info(
'Posting Slack Attachment {}'.format(attachment.name))
# Prepare API Upload Payload
_payload = {
'filename': attachment.name,
'channels': ','.join(attach_channel_list)
}
# Our URL
_url = self.api_url.format('files.upload')
response = self._send(_url, _payload, attach=attachment)
if not (response and response.get('file') and
response['file'].get('url_private')):
# We failed to post our attachments, take an early exit
return False
return not has_error
def _send(self, url, payload, attach=None, **kwargs):
"""
Wrapper to the requests (post) object
"""
self.logger.debug('Slack POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
))
self.logger.debug('Slack Payload: %s' % str(payload))
headers = {
'User-Agent': self.app_id,
}
if not attach:
headers['Content-Type'] = 'application/json; charset=utf-8'
if self.mode is SlackMode.BOT:
headers['Authorization'] = 'Bearer {}'.format(self.access_token)
# Our response object
response = None
# Always call throttle before any remote server i/o is made
self.throttle()
# Our attachment path (if specified)
files = None
try:
# Open our attachment path if required:
if attach:
files = (attach.name, open(attach.path, 'rb'))
r = requests.post(
url,
data=payload if attach else dumps(payload),
headers=headers,
files=files,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifySlack.http_response_code_lookup(
r.status_code, SLACK_HTTP_ERROR_MAP)
self.logger.warning(
'Failed to send {}to Slack: '
'{}{}error={}.'.format(
attach.name if attach else '',
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return False
try:
response = loads(r.content)
except (AttributeError, ValueError):
# AttributeError means r.content was None
pass
if not (response and response.get('ok', True)):
# Bare minimum requirements not met
self.logger.warning(
'Failed to send {}to Slack: error={}.'.format(
attach.name if attach else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return False
# Message Post Response looks like this:
# {
# "attachments": [
# {
# "color": "3AA3E3",
# "fallback": "test",
# "id": 1,
# "text": "my body",
# "title": "my title",
# "ts": 1573694687
# }
# ],
# "bot_id": "BAK4K23G5",
# "icons": {
# "image_48": "https://s3-us-west-2.amazonaws.com/...
# },
# "subtype": "bot_message",
# "text": "",
# "ts": "1573694689.003700",
# "type": "message",
# "username": "Apprise"
# }
# File Attachment Responses look like this
# {
# "file": {
# "channels": [],
# "comments_count": 0,
# "created": 1573617523,
# "display_as_bot": false,
# "editable": false,
# "external_type": "",
# "filetype": "png",
# "groups": [],
# "has_rich_preview": false,
# "id": "FQJJLDAHM",
# "image_exif_rotation": 1,
# "ims": [],
# "is_external": false,
# "is_public": false,
# "is_starred": false,
# "mimetype": "image/png",
# "mode": "hosted",
# "name": "apprise-test.png",
# "original_h": 640,
# "original_w": 640,
# "permalink": "https://{name}.slack.com/files/...
# "permalink_public": "https://slack-files.com/...
# "pretty_type": "PNG",
# "public_url_shared": false,
# "shares": {},
# "size": 238810,
# "thumb_160": "https://files.slack.com/files-tmb/...
# "thumb_360": "https://files.slack.com/files-tmb/...
# "thumb_360_h": 360,
# "thumb_360_w": 360,
# "thumb_480": "https://files.slack.com/files-tmb/...
# "thumb_480_h": 480,
# "thumb_480_w": 480,
# "thumb_64": "https://files.slack.com/files-tmb/...
# "thumb_80": "https://files.slack.com/files-tmb/...
# "thumb_tiny": abcd...
# "timestamp": 1573617523,
# "title": "apprise-test",
# "url_private": "https://files.slack.com/files-pri/...
# "url_private_download": "https://files.slack.com/files-...
# "user": "UADKLLMJT",
# "username": ""
# },
# "ok": true
# }
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured posting {}to Slack.'.format(
attach.name if attach else ''))
self.logger.debug('Socket Exception: %s' % str(e))
return False
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occured while reading {}.'.format(
attach.name if attach else 'attachment'))
self.logger.debug('I/O Exception: %s' % str(e))
return False
finally:
# Close our file (if it's open) stored in the second element
# of our files tuple (index 1)
if files:
files[1].close()
# Return the response for processing
return response
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
@ -378,23 +637,35 @@ class NotifySlack(NotifyBase):
'format': self.notify_format,
'overflow': self.overflow_mode,
'image': 'yes' if self.include_image else 'no',
'footer': 'yes' if self.include_footer else 'no',
'verify': 'yes' if self.verify_certificate else 'no',
}
# Determine if there is a botname present
botname = ''
if self.user:
botname = '{botname}@'.format(
botname=NotifySlack.quote(self.user, safe=''),
)
if self.mode == SlackMode.WEBHOOK:
# Determine if there is a botname present
botname = ''
if self.user:
botname = '{botname}@'.format(
botname=NotifySlack.quote(self.user, safe=''),
)
return '{schema}://{botname}{token_a}/{token_b}/{token_c}/{targets}/'\
return '{schema}://{botname}{token_a}/{token_b}/{token_c}/'\
'{targets}/?{args}'.format(
schema=self.secure_protocol,
botname=botname,
token_a=self.pprint(self.token_a, privacy, safe=''),
token_b=self.pprint(self.token_b, privacy, safe=''),
token_c=self.pprint(self.token_c, privacy, safe=''),
targets='/'.join(
[NotifySlack.quote(x, safe='')
for x in self.channels]),
args=NotifySlack.urlencode(args),
)
# else -> self.mode == SlackMode.BOT:
return '{schema}://{access_token}/{targets}/'\
'?{args}'.format(
schema=self.secure_protocol,
botname=botname,
token_a=self.pprint(self.token_a, privacy, safe=''),
token_b=self.pprint(self.token_b, privacy, safe=''),
token_c=self.pprint(self.token_c, privacy, safe=''),
access_token=self.pprint(self.access_token, privacy, safe=''),
targets='/'.join(
[NotifySlack.quote(x, safe='') for x in self.channels]),
args=NotifySlack.urlencode(args),
@ -407,32 +678,40 @@ class NotifySlack(NotifyBase):
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# The first token is stored in the hostname
token = NotifySlack.unquote(results['host'])
# Get unquoted entries
entries = NotifySlack.split_path(results['fullpath'])
# The first token is stored in the hostname
results['token_a'] = NotifySlack.unquote(results['host'])
# Verify if our token_a us a bot token or part of a webhook:
if token.startswith('xo'):
# We're dealing with a bot
results['access_token'] = token
# Now fetch the remaining tokens
try:
results['token_b'] = entries.pop(0)
else:
# We're dealing with a webhook
results['token_a'] = token
except IndexError:
# We're done
results['token_b'] = None
# Now fetch the remaining tokens
try:
results['token_b'] = entries.pop(0)
try:
results['token_c'] = entries.pop(0)
except IndexError:
# We're done
results['token_b'] = None
except IndexError:
# We're done
results['token_c'] = None
try:
results['token_c'] = entries.pop(0)
except IndexError:
# We're done
results['token_c'] = None
# assign remaining entries to the channels we wish to notify
results['targets'] = entries
@ -444,10 +723,14 @@ class NotifySlack(NotifyBase):
bool, CHANNEL_LIST_DELIM.split(
NotifySlack.unquote(results['qsd']['to'])))]
# Get Image
# Get Image Flag
results['include_image'] = \
parse_bool(results['qsd'].get('image', True))
# Get Footer Flag
results['include_footer'] = \
parse_bool(results['qsd'].get('footer', True))
return results
@staticmethod

View File

@ -33,9 +33,11 @@ import mock
from os import chmod
from os import getuid
from os.path import dirname
from os.path import join
from apprise import Apprise
from apprise import AppriseAsset
from apprise import AppriseAttachment
from apprise import NotifyBase
from apprise import NotifyType
from apprise import NotifyFormat
@ -54,6 +56,9 @@ import inspect
import logging
logging.disable(logging.CRITICAL)
# Attachment Directory
TEST_VAR_DIR = join(dirname(__file__), 'var')
def test_apprise():
"""
@ -166,7 +171,7 @@ def test_apprise():
# Support URL
return ''
def notify(self, **kwargs):
def send(self, **kwargs):
# Pretend everything is okay
return True
@ -200,6 +205,39 @@ def test_apprise():
assert a.notify(title='present', body=None) is True
assert a.notify(title="present", body="present") is True
# Send Attachment with success
attach = join(TEST_VAR_DIR, 'apprise-test.gif')
assert a.notify(
body='body', title='test', notify_type=NotifyType.INFO,
attach=attach) is True
# Send the attachment as an AppriseAttachment object
assert a.notify(
body='body', title='test', notify_type=NotifyType.INFO,
attach=AppriseAttachment(attach)) is True
# test a invalid attachment
assert a.notify(
body='body', title='test', notify_type=NotifyType.INFO,
attach='invalid://') is False
# Repeat the same tests above...
# however do it by directly accessing the object; this grants the similar
# results:
assert a[0].notify(
body='body', title='test', notify_type=NotifyType.INFO,
attach=attach) is True
# Send the attachment as an AppriseAttachment object
assert a[0].notify(
body='body', title='test', notify_type=NotifyType.INFO,
attach=AppriseAttachment(attach)) is True
# test a invalid attachment
assert a[0].notify(
body='body', title='test', notify_type=NotifyType.INFO,
attach='invalid://') is False
# Clear our server listings again
a.clear()

View File

@ -0,0 +1,388 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys
import pytest
from os.path import getsize
from os.path import join
from os.path import dirname
from apprise.AppriseAttachment import AppriseAttachment
from apprise.AppriseAsset import AppriseAsset
from apprise.attachment.AttachBase import AttachBase
from apprise.attachment import SCHEMA_MAP as ATTACH_SCHEMA_MAP
from apprise.attachment import __load_matrix
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
TEST_VAR_DIR = join(dirname(__file__), 'var')
def test_apprise_attachment():
"""
API: AppriseAttachment basic testing
"""
# Create ourselves an attachment object
aa = AppriseAttachment()
# There are no attachents loaded
assert len(aa) == 0
# Object can be directly checked as a boolean; response is False
# when there are no entries loaded
assert not aa
# An attachment object using a custom Apprise Asset object
# Set a cache expiry of 5 minutes (300 seconds)
aa = AppriseAttachment(asset=AppriseAsset(), cache=300)
# still no attachments added
assert len(aa) == 0
# Add a file by it's path
path = join(TEST_VAR_DIR, 'apprise-test.gif')
assert aa.add(path)
# There is now 1 attachment
assert len(aa) == 1
# our attachment took on our cache value
assert aa[0].cache == 300
# we can test the object as a boolean and get a value of True now
assert aa
# Add another entry already in it's AttachBase format
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachBase)
assert aa.add(response, asset=AppriseAsset())
# There is now 2 attachments
assert len(aa) == 2
# No cache set, so our cache defaults to True
assert aa[1].cache is True
# Reset our object
aa = AppriseAttachment()
# We can add by lists as well in a variety of formats
attachments = (
path,
'file://{}?name=newfilename.gif?cache=120'.format(path),
AppriseAttachment.instantiate(
'file://{}?name=anotherfilename.gif'.format(path), cache=100),
)
# Add them
assert aa.add(attachments, cache=False)
# There is now 3 attachments
assert len(aa) == 3
# Take on our fixed cache value of False.
# The last entry will have our set value of 100
assert aa[0].cache is False
# Even though we set a value of 120, we take on the value of False because
# it was forced on the instantiate call
assert aa[1].cache is False
assert aa[2].cache == 100
# We can pop the last element off of the list as well
attachment = aa.pop()
assert isinstance(attachment, AttachBase)
# we can test of the attachment is valid using a boolean check:
assert attachment
assert len(aa) == 2
assert attachment.path == path
assert attachment.name == 'anotherfilename.gif'
assert attachment.mimetype == 'image/gif'
# elements can also be directly indexed
assert isinstance(aa[0], AttachBase)
assert isinstance(aa[1], AttachBase)
with pytest.raises(IndexError):
aa[2]
# We can iterate over attachments too:
for count, a in enumerate(aa):
assert isinstance(a, AttachBase)
# we'll never iterate more then the number of entries in our object
assert count < len(aa)
# Get the file-size of our image
expected_size = getsize(path) * len(aa)
# verify that's what we get as a result
assert aa.size() == expected_size
# Attachments can also be loaded during the instantiation of the
# AppriseAttachment object
aa = AppriseAttachment(attachments)
# There is now 3 attachments
assert len(aa) == 3
# Reset our object
aa.clear()
assert len(aa) == 0
assert not aa
assert aa.add(AppriseAttachment.instantiate(
'file://{}?name=andanother.png&cache=Yes'.format(path)))
assert aa.add(AppriseAttachment.instantiate(
'file://{}?name=andanother.png&cache=No'.format(path)))
AppriseAttachment.instantiate(
'file://{}?name=andanother.png&cache=600'.format(path))
assert aa.add(AppriseAttachment.instantiate(
'file://{}?name=andanother.png&cache=600'.format(path)))
assert len(aa) == 3
assert aa[0].cache is True
assert aa[1].cache is False
assert aa[2].cache == 600
# Negative cache are not allowed
assert not aa.add(AppriseAttachment.instantiate(
'file://{}?name=andanother.png&cache=-600'.format(path)))
# No length change
assert len(aa) == 3
# Reset our object
aa.clear()
# if instantiating attachments from the class, it will throw a TypeError
# if attachments couldn't be loaded
with pytest.raises(TypeError):
AppriseAttachment('garbage://')
# Garbage in produces garbage out
assert aa.add(None) is False
assert aa.add(object()) is False
assert aa.add(42) is False
# length remains unchanged
assert len(aa) == 0
# We can add by lists as well in a variety of formats
attachments = (
None,
object(),
42,
'garbage://',
)
# Add our attachments
assert aa.add(attachments) is False
# length remains unchanged
assert len(aa) == 0
# test cases when file simply doesn't exist
aa = AppriseAttachment('file://non-existant-file.png')
# Our length is still 1
assert len(aa) == 1
# Our object will still return a True
assert aa
# However our indexed entry will not
assert not aa[0]
# length will return 0
assert len(aa[0]) == 0
# Total length will also return 0
assert aa.size() == 0
def test_apprise_attachment_instantiate():
"""
API: AppriseAttachment.instantiate()
"""
assert AppriseAttachment.instantiate(
'file://?', suppress_exceptions=True) is None
assert AppriseAttachment.instantiate(
'invalid://?', suppress_exceptions=True) is None
class BadAttachType(AttachBase):
def __init__(self, **kwargs):
super(BadAttachType, self).__init__(**kwargs)
# We fail whenever we're initialized
raise TypeError()
# Store our bad attachment type in our schema map
ATTACH_SCHEMA_MAP['bad'] = BadAttachType
with pytest.raises(TypeError):
AppriseAttachment.instantiate(
'bad://path', suppress_exceptions=False)
# Same call but exceptions suppressed
assert AppriseAttachment.instantiate(
'bad://path', suppress_exceptions=True) is None
def test_apprise_attachment_matrix_load():
"""
API: AppriseAttachment() matrix initialization
"""
import apprise
class AttachmentDummy(AttachBase):
"""
A dummy wrapper for testing the different options in the load_matrix
function
"""
# The default descriptive name associated with the Notification
service_name = 'dummy'
# protocol as tuple
protocol = ('uh', 'oh')
# secure protocol as tuple
secure_protocol = ('no', 'yes')
class AttachmentDummy2(AttachBase):
"""
A dummy wrapper for testing the different options in the load_matrix
function
"""
# The default descriptive name associated with the Notification
service_name = 'dummy2'
# secure protocol as tuple
secure_protocol = ('true', 'false')
class AttachmentDummy3(AttachBase):
"""
A dummy wrapper for testing the different options in the load_matrix
function
"""
# The default descriptive name associated with the Notification
service_name = 'dummy3'
# secure protocol as string
secure_protocol = 'true'
class AttachmentDummy4(AttachBase):
"""
A dummy wrapper for testing the different options in the load_matrix
function
"""
# The default descriptive name associated with the Notification
service_name = 'dummy4'
# protocol as string
protocol = 'true'
# Generate ourselves a fake entry
apprise.attachment.AttachmentDummy = AttachmentDummy
apprise.attachment.AttachmentDummy2 = AttachmentDummy2
apprise.attachment.AttachmentDummy3 = AttachmentDummy3
apprise.attachment.AttachmentDummy4 = AttachmentDummy4
__load_matrix()
# Call it again so we detect our entries already loaded
__load_matrix()
def test_attachment_matrix_dynamic_importing(tmpdir):
"""
API: Apprise() Attachment Matrix Importing
"""
# Make our new path valid
suite = tmpdir.mkdir("apprise_attach_test_suite")
suite.join("__init__.py").write('')
module_name = 'badattach'
# Update our path to point to our new test suite
sys.path.insert(0, str(suite))
# Create a base area to work within
base = suite.mkdir(module_name)
base.join("__init__.py").write('')
# Test no app_id
base.join('AttachBadFile1.py').write(
"""
class AttachBadFile1(object):
pass""")
# No class of the same name
base.join('AttachBadFile2.py').write(
"""
class BadClassName(object):
pass""")
# Exception thrown
base.join('AttachBadFile3.py').write("""raise ImportError()""")
# Utilizes a schema:// already occupied (as string)
base.join('AttachGoober.py').write(
"""
from apprise import AttachBase
class AttachGoober(AttachBase):
# This class tests the fact we have a new class name, but we're
# trying to over-ride items previously used
# The default simple (insecure) protocol
protocol = 'http'
# The default secure protocol
secure_protocol = 'https'""")
# Utilizes a schema:// already occupied (as tuple)
base.join('AttachBugger.py').write("""
from apprise import AttachBase
class AttachBugger(AttachBase):
# This class tests the fact we have a new class name, but we're
# trying to over-ride items previously used
# The default simple (insecure) protocol
protocol = ('http', 'bugger-test' )
# The default secure protocol
secure_protocol = ('https', 'bugger-tests')""")
__load_matrix(path=str(base), name=module_name)

92
test/test_attach_base.py Normal file
View File

@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import mock
import pytest
from apprise.attachment.AttachBase import AttachBase
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
def test_mimetype_initialization():
"""
API: AttachBase() mimetype initialization
"""
with mock.patch('mimetypes.init') as mock_init:
with mock.patch('mimetypes.inited', True):
AttachBase()
assert mock_init.call_count == 0
with mock.patch('mimetypes.init') as mock_init:
with mock.patch('mimetypes.inited', False):
AttachBase()
assert mock_init.call_count == 1
def test_attach_base():
"""
API: AttachBase()
"""
# an invalid mime-type
with pytest.raises(TypeError):
AttachBase(**{'mimetype': 'invalid'})
# a valid mime-type does not cause an exception to throw
AttachBase(**{'mimetype': 'image/png'})
# Create an object with no mimetype over-ride
obj = AttachBase()
# We can not process name/path/mimetype at a Base level
with pytest.raises(NotImplementedError):
obj.download()
with pytest.raises(NotImplementedError):
obj.name
with pytest.raises(NotImplementedError):
obj.path
with pytest.raises(NotImplementedError):
obj.mimetype
# Unsupported URLs are not parsed
assert AttachBase.parse_url(url='invalid://') is None
# Valid URL & Valid Format
results = AttachBase.parse_url(url='file://relative/path')
assert isinstance(results, dict)
# No mime is defined
assert results.get('mimetype') is None
# Valid URL & Valid Format with mime type set
results = AttachBase.parse_url(url='file://relative/path?mime=image/jpeg')
assert isinstance(results, dict)
# mime defined
assert results.get('mimetype') == 'image/jpeg'

181
test/test_attach_file.py Normal file
View File

@ -0,0 +1,181 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
import time
import mock
from os.path import dirname
from os.path import join
from apprise.attachment.AttachBase import AttachBase
from apprise.attachment.AttachFile import AttachFile
from apprise import AppriseAttachment
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
TEST_VAR_DIR = join(dirname(__file__), 'var')
def test_attach_file_parse_url():
"""
API: AttachFile().parse_url()
"""
# bad entry
assert AttachFile.parse_url('garbage://') is None
# no file path specified
assert AttachFile.parse_url('file://') is None
def test_file_expiry(tmpdir):
"""
API: AttachFile Expiry
"""
path = join(TEST_VAR_DIR, 'apprise-test.gif')
image = tmpdir.mkdir("apprise_file").join("test.jpg")
with open(path, 'rb') as data:
image.write(data)
aa = AppriseAttachment.instantiate(str(image), cache=30)
# Our file is now available
assert aa.exists()
# Our second call has the file already downloaded, but now compares
# it's date against when we consider it to have expire. We're well
# under 30 seconds here (our set value), so this will succeed
assert aa.exists()
with mock.patch('time.time', return_value=time.time() + 31):
# This will force a re-download as our cache will have
# expired
assert aa.exists()
with mock.patch('time.time', side_effect=OSError):
# We will throw an exception
assert aa.exists()
def test_attach_file():
"""
API: AttachFile()
"""
# Simple gif test
path = join(TEST_VAR_DIR, 'apprise-test.gif')
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachFile)
assert response.path == path
assert response.name == 'apprise-test.gif'
assert response.mimetype == 'image/gif'
# Download is successful and has already been called by now; below pulls
# results from cache
assert response.download()
assert response.url().startswith('file://{}'.format(path))
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', response.url()) is None
assert re.search(r'[?&]name=', response.url()) is None
# File handling (even if image is set to maxium allowable)
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachFile)
with mock.patch('os.path.getsize', return_value=AttachBase.max_file_size):
# It will still work
assert response.path == path
# File handling when size is to large
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachFile)
with mock.patch(
'os.path.getsize', return_value=AttachBase.max_file_size + 1):
# We can't work in this case
assert response.path is None
# File handling when image is not available
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachFile)
with mock.patch('os.path.isfile', return_value=False):
# This triggers a full check and will fail the isfile() check
assert response.path is None
# The call to AttachBase.path automatically triggers a call to download()
# but this same is done with a call to AttachBase.name as well. Above
# test cases reference 'path' right after instantiation; here we reference
# 'name'
response = AppriseAttachment.instantiate(path)
assert response.name == 'apprise-test.gif'
assert response.path == path
assert response.mimetype == 'image/gif'
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', response.url()) is None
assert re.search(r'[?&]name=', response.url()) is None
# continuation to cheking 'name' instead of 'path' first where our call
# to download() fails
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachFile)
with mock.patch('os.path.isfile', return_value=False):
# This triggers a full check and will fail the isfile() check
assert response.name is None
# The call to AttachBase.path automatically triggers a call to download()
# but this same is done with a call to AttachBase.mimetype as well. Above
# test cases reference 'path' right after instantiation; here we reference
# 'mimetype'
response = AppriseAttachment.instantiate(path)
assert response.mimetype == 'image/gif'
assert response.name == 'apprise-test.gif'
assert response.path == path
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', response.url()) is None
assert re.search(r'[?&]name=', response.url()) is None
# continuation to cheking 'name' instead of 'path' first where our call
# to download() fails
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachFile)
with mock.patch('os.path.isfile', return_value=False):
# download() fails so we don't have a mimetpe
assert response.mimetype is None
assert response.name is None
assert response.path is None
# This triggers a full check and will fail the isfile() check
# Force a mime-type and new name
response = AppriseAttachment.instantiate(
'file://{}?mime={}&name={}'.format(path, 'image/jpeg', 'test.jpeg'))
assert isinstance(response, AttachFile)
assert response.path == path
assert response.name == 'test.jpeg'
assert response.mimetype == 'image/jpeg'
# We will match on mime type now (%2F = /)
assert re.search(r'[?&]mime=image%2Fjpeg', response.url(), re.I)
assert re.search(r'[?&]name=test\.jpeg', response.url(), re.I)

351
test/test_attach_http.py Normal file
View File

@ -0,0 +1,351 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
import six
import mock
import requests
import mimetypes
from os.path import join
from os.path import dirname
from os.path import getsize
from apprise.attachment.AttachHTTP import AttachHTTP
from apprise import AppriseAttachment
from apprise.plugins.NotifyBase import NotifyBase
from apprise.plugins import SCHEMA_MAP
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
TEST_VAR_DIR = join(dirname(__file__), 'var')
# Some exception handling we'll use
REQUEST_EXCEPTIONS = (
requests.ConnectionError(
0, 'requests.ConnectionError() not handled'),
requests.RequestException(
0, 'requests.RequestException() not handled'),
requests.HTTPError(
0, 'requests.HTTPError() not handled'),
requests.ReadTimeout(
0, 'requests.ReadTimeout() not handled'),
requests.TooManyRedirects(
0, 'requests.TooManyRedirects() not handled'),
# Throw OSError exceptions too
OSError("SystemError")
)
def test_attach_http_parse_url():
"""
API: AttachHTTP().parse_url()
"""
# bad entry
assert AttachHTTP.parse_url('garbage://') is None
# no url specified
assert AttachHTTP.parse_url('http://') is None
@mock.patch('requests.get')
def test_attach_http(mock_get):
"""
API: AttachHTTP() object
"""
# Define our good:// url
class GoodNotification(NotifyBase):
def __init__(self, *args, **kwargs):
super(GoodNotification, self).__init__(*args, **kwargs)
def notify(self, *args, **kwargs):
# Pretend everything is okay
return True
def url(self):
# Support url() function
return ''
# Store our good notification in our schema map
SCHEMA_MAP['good'] = GoodNotification
# Temporary path
path = join(TEST_VAR_DIR, 'apprise-test.gif')
class DummyResponse(object):
"""
A dummy response used to manage our object
"""
status_code = requests.codes.ok
headers = {
'Content-Length': getsize(path),
'Content-Type': 'image/gif',
}
# Pointer to file
ptr = None
# used to return random keep-alive chunks
_keepalive_chunk_ref = 0
def close(self):
return
def iter_content(self, chunk_size=1024):
"""Lazy function (generator) to read a file piece by piece.
Default chunk size: 1k."""
while True:
self._keepalive_chunk_ref += 1
if 16 % self._keepalive_chunk_ref == 0:
# Yield a keep-alive block
yield ''
data = self.ptr.read(chunk_size)
if not data:
break
yield data
def raise_for_status(self):
return
def __enter__(self):
self.ptr = open(path, 'rb')
return self
def __exit__(self, *args, **kwargs):
self.ptr.close()
# Prepare Mock
dummy_response = DummyResponse()
mock_get.return_value = dummy_response
results = AttachHTTP.parse_url(
'http://user:pass@localhost/apprise.gif?+key=value')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
# No Content-Disposition; so we use filename from path
assert attachment.name == 'apprise.gif'
assert attachment.mimetype == 'image/gif'
results = AttachHTTP.parse_url(
'http://localhost:3000/noname.gif?name=usethis.jpg&mime=image/jpeg')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
assert isinstance(attachment.url(), six.string_types) is True
# both mime and name over-ridden
assert re.search(r'[?&]mime=image%2Fjpeg', attachment.url())
assert re.search(r'[?&]name=usethis.jpg', attachment.url())
# No Content-Disposition; so we use filename from path
assert attachment.name == 'usethis.jpg'
assert attachment.mimetype == 'image/jpeg'
# Edge case; download called a second time when content already retrieved
assert attachment.download()
assert attachment
assert len(attachment) == getsize(path)
# No path specified
# No Content-Disposition specified
# No filename (because no path)
results = AttachHTTP.parse_url('http://localhost')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
assert attachment.mimetype == 'image/gif'
# Because we could determine our mime type, we could build an extension
# for our unknown filename
assert attachment.name == '{}{}'.format(
AttachHTTP.unknown_filename,
mimetypes.guess_extension(attachment.mimetype)
)
assert attachment
assert len(attachment) == getsize(path)
# Set Content-Length to a value that exceeds our maximum allowable
dummy_response.headers['Content-Length'] = AttachHTTP.max_file_size + 1
results = AttachHTTP.parse_url('http://localhost/toobig.jpg')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
# we can not download this attachment
assert not attachment
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
assert attachment.mimetype is None
assert attachment.name is None
assert len(attachment) == 0
# Handle cases where we have no Content-Length and we need to rely
# on what is read as it is streamed
del dummy_response.headers['Content-Length']
# No path specified
# No Content-Disposition specified
# No Content-Length specified
# No filename (because no path)
results = AttachHTTP.parse_url('http://localhost/no-length.gif')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
assert attachment.mimetype == 'image/gif'
# Because we could determine our mime type, we could build an extension
# for our unknown filename
assert attachment.name == 'no-length.gif'
assert attachment
assert len(attachment) == getsize(path)
# Set our limit to be the length of our image; everything should work
# without a problem
max_file_size = AttachHTTP.max_file_size
AttachHTTP.max_file_size = getsize(path)
# Set ourselves a Content-Disposition (providing a filename)
dummy_response.headers['Content-Disposition'] = \
'attachment; filename="myimage.gif"'
# Remove our content type so we're forced to guess it from our filename
# specified in our Content-Disposition
del dummy_response.headers['Content-Type']
# No path specified
# No Content-Length specified
# Filename in Content-Disposition (over-rides one found in path
results = AttachHTTP.parse_url('http://user@localhost/ignore-filename.gif')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
assert attachment.mimetype == 'image/gif'
# Because we could determine our mime type, we could build an extension
# for our unknown filename
assert attachment.name == 'myimage.gif'
assert attachment
assert len(attachment) == getsize(path)
# Similar to test above except we make our max message size just 1 byte
# smaller then our gif file. This will cause us to fail to read the
# attachment
AttachHTTP.max_file_size = getsize(path) - 1
results = AttachHTTP.parse_url('http://localhost/toobig.jpg')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
# we can not download this attachment
assert not attachment
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
assert attachment.mimetype is None
assert attachment.name is None
assert len(attachment) == 0
# Disable our file size limitations
AttachHTTP.max_file_size = 0
results = AttachHTTP.parse_url('http://user@localhost')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
assert attachment.mimetype == 'image/gif'
# Because we could determine our mime type, we could build an extension
# for our unknown filename
assert attachment.name == 'myimage.gif'
assert attachment
assert len(attachment) == getsize(path)
# Set our header up with an invalid Content-Length; we can still process
# this data. It just means we track it lower when reading back content
dummy_response.headers = {
'Content-Length': 'invalid'
}
results = AttachHTTP.parse_url('http://localhost/invalid-length.gif')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
assert attachment.mimetype == 'image/gif'
# Because we could determine our mime type, we could build an extension
# for our unknown filename
assert attachment.name == 'invalid-length.gif'
assert attachment
# Give ourselves nothing to work with
dummy_response.headers = {}
results = AttachHTTP.parse_url('http://user@localhost')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
# we can not download this attachment
assert attachment
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
# Handle edge-case where detected_name is None for whatever reason
attachment.detected_name = None
assert attachment.mimetype == attachment.unknown_mimetype
assert attachment.name.startswith(AttachHTTP.unknown_filename)
assert len(attachment) == getsize(path)
# Exception handling
mock_get.return_value = None
for _exception in REQUEST_EXCEPTIONS:
aa = AppriseAttachment.instantiate(
'http://localhost/exception.gif?cache=30')
assert isinstance(aa, AttachHTTP)
mock_get.side_effect = _exception
assert not aa
# Restore value
AttachHTTP.max_file_size = max_file_size

View File

@ -25,6 +25,8 @@
from __future__ import print_function
import re
import mock
from os.path import dirname
from os.path import join
from apprise import cli
from apprise import NotifyBase
from click.testing import CliRunner
@ -227,10 +229,11 @@ def test_apprise_cli(tmpdir):
])
assert result.exit_code == 2
# Here is a case where we get what was expected
# Here is a case where we get what was expected; we also attach a file
result = runner.invoke(cli.main, [
'-b', 'has taga',
'--config', str(t),
'--attach', join(dirname(__file__), 'var', 'apprise-test.gif'),
'--tag', 'myTag',
])
assert result.exit_code == 0

191
test/test_discord_plugin.py Normal file
View File

@ -0,0 +1,191 @@
# -*- coding: utf-8 -*-
#
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import os
import six
import mock
import pytest
import requests
from apprise import Apprise
from apprise import AppriseAttachment
from apprise import plugins
from apprise import NotifyType
from apprise import NotifyFormat
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
# Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
@mock.patch('requests.post')
def test_discord_plugin(mock_post):
"""
API: NotifyDiscord() General Checks
"""
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Initialize some generic (but valid) tokens
webhook_id = 'A' * 24
webhook_token = 'B' * 64
# Prepare Mock
mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok
# Invalid webhook id
with pytest.raises(TypeError):
plugins.NotifyDiscord(webhook_id=None, webhook_token=webhook_token)
# Invalid webhook id (whitespace)
with pytest.raises(TypeError):
plugins.NotifyDiscord(webhook_id=" ", webhook_token=webhook_token)
# Invalid webhook token
with pytest.raises(TypeError):
plugins.NotifyDiscord(webhook_id=webhook_id, webhook_token=None)
# Invalid webhook token (whitespace)
with pytest.raises(TypeError):
plugins.NotifyDiscord(webhook_id=webhook_id, webhook_token=" ")
obj = plugins.NotifyDiscord(
webhook_id=webhook_id,
webhook_token=webhook_token,
footer=True, thumbnail=False)
# Test that we get a string response
assert isinstance(obj.url(), six.string_types) is True
# This call includes an image with it's payload:
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO) is True
# Test our header parsing
test_markdown = "## Heading one\nbody body\n\n" + \
"# Heading 2 ##\n\nTest\n\n" + \
"more content\n" + \
"even more content \t\r\n\n\n" + \
"# Heading 3 ##\n\n\n" + \
"normal content\n" + \
"# heading 4\n" + \
"#### Heading 5"
results = obj.extract_markdown_sections(test_markdown)
assert isinstance(results, list) is True
# We should have 5 sections (since there are 5 headers identified above)
assert len(results) == 5
assert results[0]['name'] == 'Heading one'
assert results[0]['value'] == '```md\nbody body\n```'
assert results[1]['name'] == 'Heading 2'
assert results[1]['value'] == \
'```md\nTest\n\nmore content\neven more content\n```'
assert results[2]['name'] == 'Heading 3'
assert results[2]['value'] == \
'```md\nnormal content\n```'
assert results[3]['name'] == 'heading 4'
assert results[3]['value'] == '```md\n\n```'
assert results[4]['name'] == 'Heading 5'
assert results[4]['value'] == '```md\n\n```'
# Test our markdown
obj = Apprise.instantiate(
'discord://{}/{}/?format=markdown'.format(webhook_id, webhook_token))
assert isinstance(obj, plugins.NotifyDiscord)
assert obj.notify(
body=test_markdown, title='title', notify_type=NotifyType.INFO) is True
# Empty String
results = obj.extract_markdown_sections("")
assert isinstance(results, list) is True
assert len(results) == 0
# String without Heading
test_markdown = "Just a string without any header entries.\n" + \
"A second line"
results = obj.extract_markdown_sections(test_markdown)
assert isinstance(results, list) is True
assert len(results) == 0
# Use our test markdown string during a notification
assert obj.notify(
body=test_markdown, title='title', notify_type=NotifyType.INFO) is True
# Create an apprise instance
a = Apprise()
# Our processing is slightly different when we aren't using markdown
# as we do not pre-parse content during our notifications
assert a.add(
'discord://{webhook_id}/{webhook_token}/'
'?format=markdown&footer=Yes'.format(
webhook_id=webhook_id,
webhook_token=webhook_token)) is True
# This call includes an image with it's payload:
assert a.notify(body=test_markdown, title='title',
notify_type=NotifyType.INFO,
body_format=NotifyFormat.TEXT) is True
assert a.notify(body=test_markdown, title='title',
notify_type=NotifyType.INFO,
body_format=NotifyFormat.MARKDOWN) is True
# Toggle our logo availability
a.asset.image_url_logo = None
assert a.notify(
body='body', title='title', notify_type=NotifyType.INFO) is True
@mock.patch('requests.post')
def test_discord_attachments(mock_post):
"""
API: NotifyDiscord() Attachment Checks
"""
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Initialize some generic (but valid) tokens
webhook_id = 'C' * 24
webhook_token = 'D' * 64
# Prepare Mock return object
response = mock.Mock()
response.status_code = requests.codes.ok
# Throw an exception on the second call to requests.post()
mock_post.side_effect = [response, OSError()]
# Test our markdown
obj = Apprise.instantiate(
'discord://{}/{}/?format=markdown'.format(webhook_id, webhook_token))
# attach our content
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
# We'll fail now because of an internal exception
assert obj.send(body="test", attach=attach) is False

View File

@ -23,6 +23,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import os
import re
import six
import mock
@ -31,12 +32,16 @@ import smtplib
from apprise import plugins
from apprise import NotifyType
from apprise import Apprise
from apprise import AttachBase
from apprise import AppriseAttachment
from apprise.plugins import NotifyEmailBase
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
# Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
TEST_URLS = (
##################################
@ -462,6 +467,42 @@ def test_smtplib_send_okay(mock_smtplib):
assert obj.notify(
body='body', title='test', notify_type=NotifyType.INFO) is True
# Create an apprise object to work with as well
a = Apprise()
assert a.add('mailto://user:pass@gmail.com?format=text')
# Send Attachment with success
attach = os.path.join(TEST_VAR_DIR, 'apprise-test.gif')
assert obj.notify(
body='body', title='test', notify_type=NotifyType.INFO,
attach=attach) is True
# same results happen from our Apprise object
assert a.notify(body='body', title='test', attach=attach) is True
# test using an Apprise Attachment object
assert obj.notify(
body='body', title='test', notify_type=NotifyType.INFO,
attach=AppriseAttachment(attach)) is True
# same results happen from our Apprise object
assert a.notify(
body='body', title='test', attach=AppriseAttachment(attach)) is True
max_file_size = AttachBase.max_file_size
# Now do a case where the file can't be sent
AttachBase.max_file_size = 1
assert obj.notify(
body='body', title='test', notify_type=NotifyType.INFO,
attach=attach) is False
# same results happen from our Apprise object
assert a.notify(body='body', title='test', attach=attach) is False
# Restore value
AttachBase.max_file_size = max_file_size
def test_email_url_escaping():
"""

View File

@ -23,6 +23,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import os
import six
import pytest
import requests
@ -37,6 +38,7 @@ from apprise import NotifyType
from apprise import NotifyBase
from apprise import Apprise
from apprise import AppriseAsset
from apprise import AppriseAttachment
from apprise.common import NotifyFormat
from apprise.common import OverflowMode
@ -61,6 +63,9 @@ REQUEST_EXCEPTIONS = (
0, 'requests.TooManyRedirects() not handled'),
)
# Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
TEST_URLS = (
##################################
# NotifyBoxcar
@ -2483,10 +2488,10 @@ TEST_URLS = (
# NotifySlack
##################################
('slack://', {
'instance': None,
'instance': TypeError,
}),
('slack://:@/', {
'instance': None,
'instance': TypeError,
}),
('slack://T1JJ3T3L2', {
# Just Token 1 provided
@ -2503,6 +2508,10 @@ TEST_URLS = (
# There is an invalid channel that we will fail to deliver to
# as a result the response type will be false
'response': False,
'requests_response_text': {
'ok': False,
'message': 'Bad Channel',
},
}),
('slack://T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/#channel', {
# No username specified; this is still okay as we sub in
@ -2510,11 +2519,19 @@ TEST_URLS = (
'instance': plugins.NotifySlack,
# don't include an image by default
'include_image': False,
'requests_response_text': {
'ok': True,
'message': '',
},
}),
('slack://T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/+id/@id/', {
# + encoded id,
# @ userid
'instance': plugins.NotifySlack,
'requests_response_text': {
'ok': True,
'message': '',
},
}),
('slack://username@T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/' \
'?to=#nuxref', {
@ -2522,23 +2539,67 @@ TEST_URLS = (
# Our expected url(privacy=True) startswith() response:
'privacy_url': 'slack://username@T...2/A...D/T...Q/',
'requests_response_text': {
'ok': True,
'message': '',
},
}),
('slack://username@T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/#nuxref', {
'instance': plugins.NotifySlack,
'requests_response_text': {
'ok': True,
'message': '',
},
}),
# Test using a bot-token (also test footer set to no flag)
('slack://username@xoxb-1234-1234-abc124/#nuxref?footer=no', {
'instance': plugins.NotifySlack,
'requests_response_text': {
'ok': True,
'message': '',
# support attachments
'file': {
'url_private': 'http://localhost/',
},
},
}),
('slack://username@xoxb-1234-1234-abc124/#nuxref', {
'instance': plugins.NotifySlack,
'requests_response_text': {
'ok': True,
'message': '',
},
# we'll fail to send attachments because we had no 'file' response in
# our object
'response': False,
}),
('slack://username@T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ', {
# Missing a channel, falls back to webhook channel bindings
'instance': plugins.NotifySlack,
'requests_response_text': {
'ok': True,
'message': '',
},
}),
# Native URL Support, take the slack URL and still build from it
('https://hooks.slack.com/services/{}/{}/{}'.format(
'A' * 9, 'B' * 9, 'c' * 24), {
'instance': plugins.NotifySlack,
'requests_response_text': {
'ok': True,
'message': '',
},
}),
# Native URL Support with arguments
('https://hooks.slack.com/services/{}/{}/{}?format=text'.format(
'A' * 9, 'B' * 9, 'c' * 24), {
'instance': plugins.NotifySlack,
'requests_response_text': {
'ok': True,
'message': '',
},
}),
('slack://username@INVALID/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/#cool', {
# invalid 1st Token
@ -2557,18 +2618,30 @@ TEST_URLS = (
# force a failure
'response': False,
'requests_response_code': requests.codes.internal_server_error,
'requests_response_text': {
'ok': False,
'message': '',
},
}),
('slack://respect@T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/#a', {
'instance': plugins.NotifySlack,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
'requests_response_text': {
'ok': False,
'message': '',
},
}),
('slack://notify@T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/#b', {
'instance': plugins.NotifySlack,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
'requests_response_text': {
'ok': False,
'message': '',
},
}),
##################################
@ -3613,6 +3686,26 @@ def test_rest_plugins(mock_post, mock_get):
notify_type=notify_type,
overflow=OverflowMode.SPLIT) == notify_response
# Test single attachment support; even if the service
# doesn't support attachments, it should still gracefully
# ignore the data
attach = os.path.join(TEST_VAR_DIR, 'apprise-test.gif')
assert obj.notify(
body=body, title=title,
notify_type=notify_type,
attach=attach) == notify_response
# Same results should apply to a list of attachments
attach = AppriseAttachment((
os.path.join(TEST_VAR_DIR, 'apprise-test.gif'),
os.path.join(TEST_VAR_DIR, 'apprise-test.png'),
os.path.join(TEST_VAR_DIR, 'apprise-test.jpeg'),
))
assert obj.notify(
body=body, title=title,
notify_type=notify_type,
attach=attach) == notify_response
else:
# Disable throttling
obj.request_rate_per_sec = 0
@ -3756,125 +3849,6 @@ def test_notify_boxcar_plugin(mock_post, mock_get):
assert len(p.device_tokens) == 3
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_notify_discord_plugin(mock_post, mock_get):
"""
API: NotifyDiscord() Extra Checks
"""
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Initialize some generic (but valid) tokens
webhook_id = 'A' * 24
webhook_token = 'B' * 64
# Prepare Mock
mock_get.return_value = requests.Request()
mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok
mock_get.return_value.status_code = requests.codes.ok
# Invalid webhook id
with pytest.raises(TypeError):
plugins.NotifyDiscord(webhook_id=None, webhook_token=webhook_token)
# Invalid webhook id (whitespace)
with pytest.raises(TypeError):
plugins.NotifyDiscord(webhook_id=" ", webhook_token=webhook_token)
# Invalid webhook token
with pytest.raises(TypeError):
plugins.NotifyDiscord(webhook_id=webhook_id, webhook_token=None)
# Invalid webhook token (whitespace)
with pytest.raises(TypeError):
plugins.NotifyDiscord(webhook_id=webhook_id, webhook_token=" ")
obj = plugins.NotifyDiscord(
webhook_id=webhook_id,
webhook_token=webhook_token,
footer=True, thumbnail=False)
# This call includes an image with it's payload:
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO) is True
# Test our header parsing
test_markdown = "## Heading one\nbody body\n\n" + \
"# Heading 2 ##\n\nTest\n\n" + \
"more content\n" + \
"even more content \t\r\n\n\n" + \
"# Heading 3 ##\n\n\n" + \
"normal content\n" + \
"# heading 4\n" + \
"#### Heading 5"
results = obj.extract_markdown_sections(test_markdown)
assert isinstance(results, list) is True
# We should have 5 sections (since there are 5 headers identified above)
assert len(results) == 5
assert results[0]['name'] == 'Heading one'
assert results[0]['value'] == '```md\nbody body\n```'
assert results[1]['name'] == 'Heading 2'
assert results[1]['value'] == \
'```md\nTest\n\nmore content\neven more content\n```'
assert results[2]['name'] == 'Heading 3'
assert results[2]['value'] == \
'```md\nnormal content\n```'
assert results[3]['name'] == 'heading 4'
assert results[3]['value'] == '```md\n\n```'
assert results[4]['name'] == 'Heading 5'
assert results[4]['value'] == '```md\n\n```'
# Test our markdown
obj = Apprise.instantiate(
'discord://{}/{}/?format=markdown'.format(webhook_id, webhook_token))
assert isinstance(obj, plugins.NotifyDiscord)
assert obj.notify(
body=test_markdown, title='title', notify_type=NotifyType.INFO) is True
# Empty String
results = obj.extract_markdown_sections("")
assert isinstance(results, list) is True
assert len(results) == 0
# String without Heading
test_markdown = "Just a string without any header entries.\n" + \
"A second line"
results = obj.extract_markdown_sections(test_markdown)
assert isinstance(results, list) is True
assert len(results) == 0
# Use our test markdown string during a notification
assert obj.notify(
body=test_markdown, title='title', notify_type=NotifyType.INFO) is True
# Create an apprise instance
a = Apprise()
# Our processing is slightly different when we aren't using markdown
# as we do not pre-parse content during our notifications
assert a.add(
'discord://{webhook_id}/{webhook_token}/'
'?format=markdown&footer=Yes'.format(
webhook_id=webhook_id,
webhook_token=webhook_token)) is True
# This call includes an image with it's payload:
assert a.notify(body=test_markdown, title='title',
notify_type=NotifyType.INFO,
body_format=NotifyFormat.TEXT) is True
assert a.notify(body=test_markdown, title='title',
notify_type=NotifyType.INFO,
body_format=NotifyFormat.MARKDOWN) is True
# Toggle our logo availability
a.asset.image_url_logo = None
assert a.notify(
body='body', title='title', notify_type=NotifyType.INFO) is True
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_notify_emby_plugin_login(mock_post, mock_get):
@ -4780,50 +4754,6 @@ def test_notify_sendgrid_plugin(mock_post, mock_get):
cc=('abc@test.org', '!invalid')), plugins.NotifySendGrid)
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_notify_slack_plugin(mock_post, mock_get):
"""
API: NotifySlack() Extra Checks
"""
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Initialize some generic (but valid) tokens
token_a = 'A' * 9
token_b = 'B' * 9
token_c = 'c' * 24
# Support strings
channels = 'chan1,#chan2,+id,@user,,,'
obj = plugins.NotifySlack(
token_a=token_a, token_b=token_b, token_c=token_c, targets=channels)
assert len(obj.channels) == 4
# Prepare Mock
mock_get.return_value = requests.Request()
mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok
mock_get.return_value.status_code = requests.codes.ok
# Missing first Token
with pytest.raises(TypeError):
plugins.NotifySlack(
token_a=None, token_b=token_b, token_c=token_c,
targets=channels)
# Test include_image
obj = plugins.NotifySlack(
token_a=token_a, token_b=token_b, token_c=token_c, targets=channels,
include_image=True)
# This call includes an image with it's payload:
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO) is True
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_notify_pushbullet_plugin(mock_post, mock_get):

145
test/test_slack_plugin.py Normal file
View File

@ -0,0 +1,145 @@
# -*- coding: utf-8 -*-
#
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import six
import mock
import pytest
import requests
from apprise import plugins
from apprise import NotifyType
from json import dumps
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
@mock.patch('requests.post')
def test_slack_oauth_access_token(mock_post):
"""
API: NotifySlack() OAuth Access Token Tests
"""
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Generate an invalid bot token
token = 'xo-invalid'
request = mock.Mock()
request.content = dumps({
'ok': True,
'message': '',
})
request.status_code = requests.codes.ok
# We'll fail to validate the access_token
with pytest.raises(TypeError):
plugins.NotifySlack(access_token=token)
# Generate a (valid) bot token
token = 'xoxb-1234-1234-abc124'
# Prepare Mock
mock_post.return_value = request
# Variation Initializations
obj = plugins.NotifySlack(access_token=token, targets='#apprise')
assert isinstance(obj, plugins.NotifySlack) is True
assert isinstance(obj.url(), six.string_types) is True
# apprise room was found
assert obj.send(body="test") is True
# Slack requests pay close attention to the response to determine
# if things go well... this is not a good JSON response:
request.content = '{'
# As a result, we'll fail to send our notification
assert obj.send(body="test") is False
request.content = dumps({
'ok': False,
'message': 'We failed',
})
# A response from Slack (even with a 200 response) still
# results in a failure:
assert obj.send(body="test") is False
# Handle exceptions reading our attachment from disk (should it happen)
mock_post.side_effect = OSError("Attachment Error")
mock_post.return_value = None
# We'll fail now because of an internal exception
assert obj.send(body="test") is False
@mock.patch('requests.post')
def test_slack_webhook(mock_post):
"""
API: NotifySlack() Webhook Tests
"""
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Prepare Mock
mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok
mock_post.return_value.content = dumps({
'ok': True,
})
# Initialize some generic (but valid) tokens
token_a = 'A' * 9
token_b = 'B' * 9
token_c = 'c' * 24
# Support strings
channels = 'chan1,#chan2,+BAK4K23G5,@user,,,'
obj = plugins.NotifySlack(
token_a=token_a, token_b=token_b, token_c=token_c, targets=channels)
assert len(obj.channels) == 4
# This call includes an image with it's payload:
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO) is True
# Missing first Token
with pytest.raises(TypeError):
plugins.NotifySlack(
token_a=None, token_b=token_b, token_c=token_c,
targets=channels)
# Test include_image
obj = plugins.NotifySlack(
token_a=token_a, token_b=token_b, token_c=token_c, targets=channels,
include_image=True)
# This call includes an image with it's payload:
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO) is True

View File

@ -736,7 +736,7 @@ def test_exclusive_match():
logic='match_me', data=data, match_all='match_me') is True
def test_apprise_validate_regex(tmpdir):
def test_apprise_validate_regex():
"""
API: Apprise() Validate Regex tests

BIN
test/var/apprise-test.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 75 KiB

BIN
test/var/apprise-test.jpeg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 72 KiB

BIN
test/var/apprise-test.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 233 KiB