diff --git a/README.md b/README.md
index b29b8a71..3edb4bec 100644
--- a/README.md
+++ b/README.md
@@ -35,6 +35,7 @@ The table below identifies the services this tool supports and some example serv
| Notification Service | Service ID | Default Port | Example Syntax |
| -------------------- | ---------- | ------------ | -------------- |
| [Apprise API](https://github.com/caronc/apprise/wiki/Notify_apprise_api) | apprise:// or apprises:// | (TCP) 80 or 443 | apprise://hostname/Token
+| [AWS SES](https://github.com/caronc/apprise/wiki/Notify_ses) | ses:// | (TCP) 443 | ses://user@domain/AccessKeyID/AccessSecretKey/RegionName
ses://user@domain/AccessKeyID/AccessSecretKey/RegionName/email1/email2/emailN
| [Boxcar](https://github.com/caronc/apprise/wiki/Notify_boxcar) | boxcar:// | (TCP) 443 | boxcar://hostname
boxcar://hostname/@tag
boxcar://hostname/device_token
boxcar://hostname/device_token1/device_token2/device_tokenN
boxcar://hostname/@tag/@tag2/device_token
| [Discord](https://github.com/caronc/apprise/wiki/Notify_discord) | discord:// | (TCP) 443 | discord://webhook_id/webhook_token
discord://avatar@webhook_id/webhook_token
| [Emby](https://github.com/caronc/apprise/wiki/Notify_emby) | emby:// or embys:// | (TCP) 8096 | emby://user@hostname/
emby://user:password@hostname
diff --git a/apprise/plugins/NotifyEmail.py b/apprise/plugins/NotifyEmail.py
index 7bd89438..c8ebb90d 100644
--- a/apprise/plugins/NotifyEmail.py
+++ b/apprise/plugins/NotifyEmail.py
@@ -708,8 +708,8 @@ class NotifyEmail(NotifyBase):
attachment.url(privacy=True)))
with open(attachment.path, "rb") as abody:
- app = MIMEApplication(
- abody.read(), attachment.mimetype)
+ app = MIMEApplication(abody.read())
+ app.set_type(attachment.mimetype)
app.add_header(
'Content-Disposition',
diff --git a/apprise/plugins/NotifySES.py b/apprise/plugins/NotifySES.py
new file mode 100644
index 00000000..462ea5f8
--- /dev/null
+++ b/apprise/plugins/NotifySES.py
@@ -0,0 +1,950 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2021 Chris Caron
+# All rights reserved.
+#
+# This code is licensed under the MIT License.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files(the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions :
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+# API Information:
+# - https://docs.aws.amazon.com/ses/latest/APIReference/API_SendRawEmail.html
+#
+# AWS Credentials (access_key and secret_access_key)
+# - https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/\
+# setup-credentials.html
+# - https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/\
+# setup-credentials.html
+#
+# Other systems write these credentials to:
+# - ~/.aws/credentials on Linux, macOS, or Unix
+# - C:\Users\USERNAME\.aws\credentials on Windows
+#
+#
+# To get A users access key ID and secret access key
+#
+# 1. Open the IAM console: https://console.aws.amazon.com/iam/home
+# 2. On the navigation menu, choose Users.
+# 3. Choose your IAM user name (not the check box).
+# 4. Open the Security credentials tab, and then choose:
+# Create Access key - Programmatic access
+# 5. To see the new access key, choose Show. Your credentials resemble
+# the following:
+# Access key ID: AKIAIOSFODNN7EXAMPLE
+# Secret access key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
+#
+# To download the key pair, choose Download .csv file. Store the keys
+# The account requries this permssion to 'SES v2 : SendEmail' in order to
+# work
+#
+# To get the root users account (if you're logged in as that) you can
+# visit: https://console.aws.amazon.com/iam/home#/\
+# security_credentials$access_key
+#
+# This information is vital to work with SES
+
+
+# To use/test the service, i logged into the portal via:
+# - https://portal.aws.amazon.com
+#
+# Go to the dashboard of the Amazon SES (Simple Email Service)
+# 1. You must have a verified identity; click on that option and create one
+# if you don't already have one. Until it's verified, you won't be able to
+# do the next step.
+# 2. From here you'll be able to retrieve your ARN associated with your
+# identity you want Apprise to send emails on behalf. It might look
+# something like:
+# arn:aws:ses:us-east-2:133216123003:identity/user@example.com
+#
+# This is your ARN (Amazon Record Name)
+#
+#
+
+import re
+import hmac
+import base64
+import requests
+from hashlib import sha256
+from datetime import datetime
+from collections import OrderedDict
+from xml.etree import ElementTree
+from email.mime.text import MIMEText
+from email.mime.application import MIMEApplication
+from email.mime.multipart import MIMEMultipart
+from email.utils import formataddr
+from email.header import Header
+try:
+ # Python v3.x
+ from urllib.parse import quote
+
+except ImportError:
+ # Python v2.x
+ from urllib import quote
+
+from .NotifyBase import NotifyBase
+from ..URLBase import PrivacyMode
+from ..common import NotifyFormat
+from ..common import NotifyType
+from ..utils import parse_emails
+from ..utils import validate_regex
+from ..AppriseLocale import gettext_lazy as _
+from ..utils import is_email
+
+# Our Regin Identifier
+# support us-gov-west-1 syntax as well
+IS_REGION = re.compile(
+ r'^\s*(?P[a-z]{2})-(?P[a-z-]+?)-(?P[0-9]+)\s*$', re.I)
+
+# Extend HTTP Error Messages
+AWS_HTTP_ERROR_MAP = {
+ 403: 'Unauthorized - Invalid Access/Secret Key Combination.',
+}
+
+
+class NotifySES(NotifyBase):
+ """
+ A wrapper for AWS SES (Amazon Simple Email Service)
+ """
+
+ # The default descriptive name associated with the Notification
+ service_name = 'AWS Simple Email Service (SES)'
+
+ # The services URL
+ service_url = 'https://aws.amazon.com/ses/'
+
+ # The default secure protocol
+ secure_protocol = 'ses'
+
+ # A URL that takes you to the setup/help of the specific protocol
+ setup_url = 'https://github.com/caronc/apprise/wiki/Notify_ses'
+
+ # AWS is pretty good for handling data load so request limits
+ # can occur in much shorter bursts
+ request_rate_per_sec = 2.5
+
+ # Default Notify Format
+ notify_format = NotifyFormat.HTML
+
+ # Define object templates
+ templates = (
+ '{schema}://{from_email}/{access_key_id}/{secret_access_key}/'
+ '{region}/{targets}',
+ '{schema}://{from_email}/{access_key_id}/{secret_access_key}/'
+ '{region}',
+ )
+
+ # Define our template tokens
+ template_tokens = dict(NotifyBase.template_tokens, **{
+ 'from_email': {
+ 'name': _('From Email'),
+ 'type': 'string',
+ 'map_to': 'from_addr',
+ },
+ 'access_key_id': {
+ 'name': _('Access Key ID'),
+ 'type': 'string',
+ 'private': True,
+ 'required': True,
+ },
+ 'secret_access_key': {
+ 'name': _('Secret Access Key'),
+ 'type': 'string',
+ 'private': True,
+ 'required': True,
+ },
+ 'region': {
+ 'name': _('Region'),
+ 'type': 'string',
+ 'regex': (r'^[a-z]{2}-[a-z-]+?-[0-9]+$', 'i'),
+ 'map_to': 'region_name',
+ },
+ 'targets': {
+ 'name': _('Target Emails'),
+ 'type': 'list:string',
+ },
+ })
+
+ # Define our template arguments
+ template_args = dict(NotifyBase.template_args, **{
+ 'to': {
+ 'alias_of': 'targets',
+ },
+ 'from': {
+ 'alias_of': 'from_email',
+ },
+ 'reply': {
+ 'name': _('Reply To Email'),
+ 'type': 'string',
+ 'map_to': 'reply_to',
+ },
+ 'name': {
+ 'name': _('From Name'),
+ 'type': 'string',
+ 'map_to': 'from_name',
+ },
+ 'cc': {
+ 'name': _('Carbon Copy'),
+ 'type': 'list:string',
+ },
+ 'bcc': {
+ 'name': _('Blind Carbon Copy'),
+ 'type': 'list:string',
+ },
+ 'access': {
+ 'alias_of': 'access_key_id',
+ },
+ 'secret': {
+ 'alias_of': 'secret_access_key',
+ },
+ 'region': {
+ 'alias_of': 'region',
+ },
+ })
+
+ def __init__(self, access_key_id, secret_access_key, region_name,
+ reply_to=None, from_addr=None, from_name=None, targets=None,
+ cc=None, bcc=None, **kwargs):
+ """
+ Initialize Notify AWS SES Object
+ """
+ super(NotifySES, self).__init__(**kwargs)
+
+ # Store our AWS API Access Key
+ self.aws_access_key_id = validate_regex(access_key_id)
+ if not self.aws_access_key_id:
+ msg = 'An invalid AWS Access Key ID was specified.'
+ self.logger.warning(msg)
+ raise TypeError(msg)
+
+ # Store our AWS API Secret Access key
+ self.aws_secret_access_key = validate_regex(secret_access_key)
+ if not self.aws_secret_access_key:
+ msg = 'An invalid AWS Secret Access Key ' \
+ '({}) was specified.'.format(secret_access_key)
+ self.logger.warning(msg)
+ raise TypeError(msg)
+
+ # Acquire our AWS Region Name:
+ # eg. us-east-1, cn-north-1, us-west-2, ...
+ self.aws_region_name = validate_regex(
+ region_name, *self.template_tokens['region']['regex'])
+ if not self.aws_region_name:
+ msg = 'An invalid AWS Region ({}) was specified.'.format(
+ region_name)
+ self.logger.warning(msg)
+ raise TypeError(msg)
+
+ # Acquire Email 'To'
+ self.targets = list()
+
+ # Acquire Carbon Copies
+ self.cc = set()
+
+ # Acquire Blind Carbon Copies
+ self.bcc = set()
+
+ # For tracking our email -> name lookups
+ self.names = {}
+
+ # Set our notify_url based on our region
+ self.notify_url = 'https://email.{}.amazonaws.com'\
+ .format(self.aws_region_name)
+
+ # AWS Service Details
+ self.aws_service_name = 'ses'
+ self.aws_canonical_uri = '/'
+
+ # AWS Authentication Details
+ self.aws_auth_version = 'AWS4'
+ self.aws_auth_algorithm = 'AWS4-HMAC-SHA256'
+ self.aws_auth_request = 'aws4_request'
+
+ # Get our From username (if specified)
+ self.from_name = from_name
+
+ if from_addr:
+ self.from_addr = from_addr
+
+ else:
+ # Get our from email address
+ self.from_addr = '{user}@{host}'.format(
+ user=self.user, host=self.host) if self.user else None
+
+ if not (self.from_addr and is_email(self.from_addr)):
+ msg = 'An invalid AWS From ({}) was specified.'.format(
+ '{user}@{host}'.format(user=self.user, host=self.host))
+ self.logger.warning(msg)
+ raise TypeError(msg)
+
+ self.reply_to = None
+ if reply_to:
+ result = is_email(reply_to)
+ if not result:
+ msg = 'An invalid AWS Reply To ({}) was specified.'.format(
+ '{user}@{host}'.format(user=self.user, host=self.host))
+ self.logger.warning(msg)
+ raise TypeError(msg)
+
+ self.reply_to = (
+ result['name'] if result['name'] else False,
+ result['full_email'])
+
+ if targets:
+ # Validate recipients (to:) and drop bad ones:
+ for recipient in parse_emails(targets):
+ result = is_email(recipient)
+ if result:
+ self.targets.append(
+ (result['name'] if result['name'] else False,
+ result['full_email']))
+ continue
+
+ self.logger.warning(
+ 'Dropped invalid To email '
+ '({}) specified.'.format(recipient),
+ )
+
+ else:
+ # If our target email list is empty we want to add ourselves to it
+ self.targets.append(
+ (self.from_name if self.from_name else False, self.from_addr))
+
+ # Validate recipients (cc:) and drop bad ones:
+ for recipient in parse_emails(cc):
+ email = is_email(recipient)
+ if email:
+ self.cc.add(email['full_email'])
+
+ # Index our name (if one exists)
+ self.names[email['full_email']] = \
+ email['name'] if email['name'] else False
+ continue
+
+ self.logger.warning(
+ 'Dropped invalid Carbon Copy email '
+ '({}) specified.'.format(recipient),
+ )
+
+ # Validate recipients (bcc:) and drop bad ones:
+ for recipient in parse_emails(bcc):
+ email = is_email(recipient)
+ if email:
+ self.bcc.add(email['full_email'])
+
+ # Index our name (if one exists)
+ self.names[email['full_email']] = \
+ email['name'] if email['name'] else False
+ continue
+
+ self.logger.warning(
+ 'Dropped invalid Blind Carbon Copy email '
+ '({}) specified.'.format(recipient),
+ )
+
+ return
+
+ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
+ **kwargs):
+ """
+ wrapper to send_notification since we can alert more then one channel
+ """
+
+ if not self.targets:
+ # There is no one to email; we're done
+ self.logger.warning(
+ 'There are no SES email recipients to notify')
+ return False
+
+ # error tracking (used for function return)
+ has_error = False
+
+ # Initialize our default from name
+ from_name = self.from_name if self.from_name \
+ else self.reply_to[0] if self.reply_to and \
+ self.reply_to[0] else self.app_desc
+
+ reply_to = (
+ from_name, self.from_addr
+ if not self.reply_to else self.reply_to[1])
+
+ # Create a copy of the targets list
+ emails = list(self.targets)
+ while len(emails):
+ # Get our email to notify
+ to_name, to_addr = emails.pop(0)
+
+ # Strip target out of cc list if in To or Bcc
+ cc = (self.cc - self.bcc - set([to_addr]))
+
+ # Strip target out of bcc list if in To
+ bcc = (self.bcc - set([to_addr]))
+
+ try:
+ # Format our cc addresses to support the Name field
+ cc = [formataddr(
+ (self.names.get(addr, False), addr), charset='utf-8')
+ for addr in cc]
+
+ # Format our bcc addresses to support the Name field
+ bcc = [formataddr(
+ (self.names.get(addr, False), addr), charset='utf-8')
+ for addr in bcc]
+
+ except TypeError:
+ # Python v2.x Support (no charset keyword)
+ # Format our cc addresses to support the Name field
+ cc = [formataddr( # pragma: no branch
+ (self.names.get(addr, False), addr)) for addr in cc]
+
+ # Format our bcc addresses to support the Name field
+ bcc = [formataddr( # pragma: no branch
+ (self.names.get(addr, False), addr)) for addr in bcc]
+
+ self.logger.debug('Email From: {} <{}>'.format(
+ quote(reply_to[0], ' '),
+ quote(reply_to[1], '@ ')))
+
+ self.logger.debug('Email To: {}'.format(to_addr))
+ if cc:
+ self.logger.debug('Email Cc: {}'.format(', '.join(cc)))
+ if bcc:
+ self.logger.debug('Email Bcc: {}'.format(', '.join(bcc)))
+
+ # Prepare Email Message
+ if self.notify_format == NotifyFormat.HTML:
+ content = MIMEText(body, 'html', 'utf-8')
+
+ else:
+ content = MIMEText(body, 'plain', 'utf-8')
+
+ # Create a Multipart container if there is an attachment
+ base = MIMEMultipart() if attach else content
+
+ base['Subject'] = Header(title, 'utf-8')
+ try:
+ base['From'] = formataddr(
+ (from_name if from_name else False, self.from_addr),
+ charset='utf-8')
+ base['To'] = formataddr((to_name, to_addr), charset='utf-8')
+ if reply_to[1] != self.from_addr:
+ base['Reply-To'] = formataddr(reply_to, charset='utf-8')
+
+ except TypeError:
+ # Python v2.x Support (no charset keyword)
+ base['From'] = formataddr(
+ (from_name if from_name else False, self.from_addr))
+ base['To'] = formataddr((to_name, to_addr))
+ if reply_to[1] != self.from_addr:
+ base['Reply-To'] = formataddr(reply_to)
+
+ base['Cc'] = ','.join(cc)
+ base['Date'] = \
+ datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S +0000")
+ base['X-Application'] = self.app_id
+
+ if attach:
+ # First attach our body to our content as the first element
+ base.attach(content)
+
+ # Now store our attachments
+ for attachment in attach:
+ if not attachment:
+ # We could not load the attachment; take an early
+ # exit since this isn't what the end user wanted
+
+ # We could not access the attachment
+ self.logger.error(
+ 'Could not access attachment {}.'.format(
+ attachment.url(privacy=True)))
+
+ return False
+
+ self.logger.debug(
+ 'Preparing Email attachment {}'.format(
+ attachment.url(privacy=True)))
+
+ with open(attachment.path, "rb") as abody:
+ app = MIMEApplication(abody.read())
+ app.set_type(attachment.mimetype)
+
+ app.add_header(
+ 'Content-Disposition',
+ 'attachment; filename="{}"'.format(
+ Header(attachment.name, 'utf-8')),
+ )
+
+ base.attach(app)
+
+ # Prepare our payload object
+ payload = {
+ 'Action': 'SendRawEmail',
+ 'Version': '2010-12-01',
+ 'RawMessage.Data': base64.b64encode(
+ base.as_string().encode('utf-8')).decode('utf-8')
+ }
+
+ for no, email in enumerate(([to_addr] + bcc + cc), start=1):
+ payload['Destinations.member.{}'.format(no)] = email
+
+ # Specify from address
+ payload['Source'] = '{} <{}>'.format(
+ quote(from_name, ' '),
+ quote(self.from_addr, '@ '))
+
+ (result, response) = self._post(payload=payload, to=to_addr)
+ if not result:
+ # Mark our failure
+ has_error = True
+ continue
+
+ return not has_error
+
+ def _post(self, payload, to):
+ """
+ Wrapper to request.post() to manage it's response better and make
+ the send() function cleaner and easier to maintain.
+
+ This function returns True if the _post was successful and False
+ if it wasn't.
+ """
+
+ # Always call throttle before any remote server i/o is made; for AWS
+ # time plays a huge factor in the headers being sent with the payload.
+ # So for AWS (SES) requests we must throttle before they're generated
+ # and not directly before the i/o call like other notification
+ # services do.
+ self.throttle()
+
+ # Convert our payload from a dict() into a urlencoded string
+ payload = NotifySES.urlencode(payload)
+
+ # Prepare our Notification URL
+ # Prepare our AWS Headers based on our payload
+ headers = self.aws_prepare_request(payload)
+
+ self.logger.debug('AWS SES POST URL: %s (cert_verify=%r)' % (
+ self.notify_url, self.verify_certificate,
+ ))
+ self.logger.debug('AWS SES Payload (%d bytes)', len(payload))
+
+ try:
+ r = requests.post(
+ self.notify_url,
+ data=payload,
+ headers=headers,
+ verify=self.verify_certificate,
+ timeout=self.request_timeout,
+ )
+
+ if r.status_code != requests.codes.ok:
+ # We had a problem
+ status_str = \
+ NotifySES.http_response_code_lookup(
+ r.status_code, AWS_HTTP_ERROR_MAP)
+
+ self.logger.warning(
+ 'Failed to send AWS SES notification to {}: '
+ '{}{}error={}.'.format(
+ to,
+ status_str,
+ ', ' if status_str else '',
+ r.status_code))
+
+ self.logger.debug('Response Details:\r\n{}'.format(r.content))
+
+ return (False, NotifySES.aws_response_to_dict(r.text))
+
+ else:
+ self.logger.info(
+ 'Sent AWS SES notification to "%s".' % (to))
+
+ except requests.RequestException as e:
+ self.logger.warning(
+ 'A Connection error occurred sending AWS SES '
+ 'notification to "%s".' % (to),
+ )
+ self.logger.debug('Socket Exception: %s' % str(e))
+ return (False, NotifySES.aws_response_to_dict(None))
+
+ return (True, NotifySES.aws_response_to_dict(r.text))
+
+ def aws_prepare_request(self, payload, reference=None):
+ """
+ Takes the intended payload and returns the headers for it.
+
+ The payload is presumed to have been already urlencoded()
+
+ """
+
+ # Define our AWS SES header
+ headers = {
+ 'User-Agent': self.app_id,
+ 'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8',
+
+ # Populated below
+ 'Content-Length': 0,
+ 'Authorization': None,
+ 'X-Amz-Date': None,
+ }
+
+ # Get a reference time (used for header construction)
+ reference = datetime.utcnow()
+
+ # Provide Content-Length
+ headers['Content-Length'] = str(len(payload))
+
+ # Amazon Date Format
+ amzdate = reference.strftime('%Y%m%dT%H%M%SZ')
+ headers['X-Amz-Date'] = amzdate
+
+ # Credential Scope
+ scope = '{date}/{region}/{service}/{request}'.format(
+ date=reference.strftime('%Y%m%d'),
+ region=self.aws_region_name,
+ service=self.aws_service_name,
+ request=self.aws_auth_request,
+ )
+
+ # Similar to headers; but a subset. keys must be lowercase
+ signed_headers = OrderedDict([
+ ('content-type', headers['Content-Type']),
+ ('host', 'email.{region}.amazonaws.com'.format(
+ region=self.aws_region_name)),
+ ('x-amz-date', headers['X-Amz-Date']),
+ ])
+
+ #
+ # Build Canonical Request Object
+ #
+ canonical_request = '\n'.join([
+ # Method
+ u'POST',
+
+ # URL
+ self.aws_canonical_uri,
+
+ # Query String (none set for POST)
+ '',
+
+ # Header Content (must include \n at end!)
+ # All entries except characters in amazon date must be
+ # lowercase
+ '\n'.join(['%s:%s' % (k, v)
+ for k, v in signed_headers.items()]) + '\n',
+
+ # Header Entries (in same order identified above)
+ ';'.join(signed_headers.keys()),
+
+ # Payload
+ sha256(payload.encode('utf-8')).hexdigest(),
+ ])
+
+ # Prepare Unsigned Signature
+ to_sign = '\n'.join([
+ self.aws_auth_algorithm,
+ amzdate,
+ scope,
+ sha256(canonical_request.encode('utf-8')).hexdigest(),
+ ])
+
+ # Our Authorization header
+ headers['Authorization'] = ', '.join([
+ '{algorithm} Credential={key}/{scope}'.format(
+ algorithm=self.aws_auth_algorithm,
+ key=self.aws_access_key_id,
+ scope=scope,
+ ),
+ 'SignedHeaders={signed_headers}'.format(
+ signed_headers=';'.join(signed_headers.keys()),
+ ),
+ 'Signature={signature}'.format(
+ signature=self.aws_auth_signature(to_sign, reference)
+ ),
+ ])
+
+ return headers
+
+ def aws_auth_signature(self, to_sign, reference):
+ """
+ Generates a AWS v4 signature based on provided payload
+ which should be in the form of a string.
+ """
+
+ def _sign(key, msg, to_hex=False):
+ """
+ Perform AWS Signing
+ """
+ if to_hex:
+ return hmac.new(key, msg.encode('utf-8'), sha256).hexdigest()
+ return hmac.new(key, msg.encode('utf-8'), sha256).digest()
+
+ _date = _sign((
+ self.aws_auth_version +
+ self.aws_secret_access_key).encode('utf-8'),
+ reference.strftime('%Y%m%d'))
+
+ _region = _sign(_date, self.aws_region_name)
+ _service = _sign(_region, self.aws_service_name)
+ _signed = _sign(_service, self.aws_auth_request)
+ return _sign(_signed, to_sign, to_hex=True)
+
+ @staticmethod
+ def aws_response_to_dict(aws_response):
+ """
+ Takes an AWS Response object as input and returns it as a dictionary
+ but not befor extracting out what is useful to us first.
+
+ eg:
+ IN:
+
+
+
+
+ 010f017d87656ee2-a2ea291f-79ea-
+ 44f3-9d25-00d041de3007-000000
+
+
+ 7abb454e-904b-4e46-a23c-2f4d2fc127a6
+
+
+
+ OUT:
+ {
+ 'type': 'SendRawEmailResponse',
+ 'message_id': '010f017d87656ee2-a2ea291f-79ea-
+ 44f3-9d25-00d041de3007-000000',
+ 'request_id': '7abb454e-904b-4e46-a23c-2f4d2fc127a6',
+ }
+ """
+
+ # Define ourselves a set of directives we want to keep if found and
+ # then identify the value we want to map them to in our response
+ # object
+ aws_keep_map = {
+ 'RequestId': 'request_id',
+ 'MessageId': 'message_id',
+
+ # Error Message Handling
+ 'Type': 'error_type',
+ 'Code': 'error_code',
+ 'Message': 'error_message',
+ }
+
+ # A default response object that we'll manipulate as we pull more data
+ # from our AWS Response object
+ response = {
+ 'type': None,
+ 'request_id': None,
+ 'message_id': None,
+ }
+
+ try:
+ # we build our tree, but not before first eliminating any
+ # reference to namespacing (if present) as it makes parsing
+ # the tree so much easier.
+ root = ElementTree.fromstring(
+ re.sub(' xmlns="[^"]+"', '', aws_response, count=1))
+
+ # Store our response tag object name
+ response['type'] = str(root.tag)
+
+ def _xml_iter(root, response):
+ if len(root) > 0:
+ for child in root:
+ # use recursion to parse everything
+ _xml_iter(child, response)
+
+ elif root.tag in aws_keep_map.keys():
+ response[aws_keep_map[root.tag]] = (root.text).strip()
+
+ # Recursivly iterate over our AWS Response to extract the
+ # fields we're interested in in efforts to populate our response
+ # object.
+ _xml_iter(root, response)
+
+ except (ElementTree.ParseError, TypeError):
+ # bad data just causes us to generate a bad response
+ pass
+
+ return response
+
+ def url(self, privacy=False, *args, **kwargs):
+ """
+ Returns the URL built dynamically based on specified arguments.
+ """
+
+ # Acquire any global URL parameters
+ params = self.url_parameters(privacy=privacy, *args, **kwargs)
+
+ if self.from_name is not None:
+ # from_name specified; pass it back on the url
+ params['name'] = self.from_name
+
+ if self.cc:
+ # Handle our Carbon Copy Addresses
+ params['cc'] = ','.join(
+ ['{}{}'.format(
+ '' if not e not in self.names
+ else '{}:'.format(self.names[e]), e) for e in self.cc])
+
+ if self.bcc:
+ # Handle our Blind Carbon Copy Addresses
+ params['bcc'] = ','.join(self.bcc)
+
+ if self.reply_to:
+ # Handle our reply to address
+ params['reply'] = '{} <{}>'.format(*self.reply_to) \
+ if self.reply_to[0] else self.reply_to[1]
+
+ # a simple boolean check as to whether we display our target emails
+ # or not
+ has_targets = \
+ not (len(self.targets) == 1
+ and self.targets[0][1] == self.from_addr)
+
+ return '{schema}://{from_addr}/{key_id}/{key_secret}/{region}/' \
+ '{targets}/?{params}'.format(
+ schema=self.secure_protocol,
+ from_addr=NotifySES.quote(self.from_addr, safe='@'),
+ key_id=self.pprint(self.aws_access_key_id, privacy, safe=''),
+ key_secret=self.pprint(
+ self.aws_secret_access_key, privacy,
+ mode=PrivacyMode.Secret, safe=''),
+ region=NotifySES.quote(self.aws_region_name, safe=''),
+ targets='' if not has_targets else '/'.join(
+ [NotifySES.quote('{}{}'.format(
+ '' if not e[0] else '{}:'.format(e[0]), e[1]),
+ safe='') for e in self.targets]),
+ params=NotifySES.urlencode(params),
+ )
+
+ @staticmethod
+ def parse_url(url):
+ """
+ Parses the URL and returns enough arguments that can allow
+ us to re-instantiate this object.
+
+ """
+ results = NotifyBase.parse_url(url, verify_host=False)
+ if not results:
+ # We're done early as we couldn't load the results
+ return results
+
+ # Get our entries; split_path() looks after unquoting content for us
+ # by default
+ entries = NotifySES.split_path(results['fullpath'])
+
+ # The AWS Access Key ID is stored in the first entry
+ access_key_id = entries.pop(0) if entries else None
+
+ # Our AWS Access Key Secret contains slashes in it which unfortunately
+ # means it is of variable length after the hostname. Since we require
+ # that the user provides the region code, we intentionally use this
+ # as our delimiter to detect where our Secret is.
+ secret_access_key = None
+ region_name = None
+
+ # We need to iterate over each entry in the fullpath and find our
+ # region. Once we get there we stop and build our secret from our
+ # accumulated data.
+ secret_access_key_parts = list()
+
+ # Section 1: Get Region and Access Secret
+ index = 0
+ for index, entry in enumerate(entries, start=1):
+
+ # Are we at the region yet?
+ result = IS_REGION.match(entry)
+ if result:
+ # Ensure region is nicely formatted
+ region_name = "{country}-{area}-{no}".format(
+ country=result.group('country').lower(),
+ area=result.group('area').lower(),
+ no=result.group('no'),
+ )
+
+ # We're done with Section 1 of our url (the credentials)
+ break
+
+ elif is_email(entry):
+ # We're done with Section 1 of our url (the credentials)
+ index -= 1
+ break
+
+ # Store our secret parts
+ secret_access_key_parts.append(entry)
+
+ # Prepare our Secret Access Key
+ secret_access_key = '/'.join(secret_access_key_parts) \
+ if secret_access_key_parts else None
+
+ # Section 2: Get our Recipients (basically all remaining entries)
+ results['targets'] = entries[index:]
+
+ if 'name' in results['qsd'] and len(results['qsd']['name']):
+ # Extract from name to associate with from address
+ results['from_name'] = \
+ NotifySES.unquote(results['qsd']['name'])
+
+ # Handle 'to' email address
+ if 'to' in results['qsd'] and len(results['qsd']['to']):
+ results['targets'].append(results['qsd']['to'])
+
+ # Handle Carbon Copy Addresses
+ if 'cc' in results['qsd'] and len(results['qsd']['cc']):
+ results['cc'] = NotifySES.parse_list(results['qsd']['cc'])
+
+ # Handle Blind Carbon Copy Addresses
+ if 'bcc' in results['qsd'] and len(results['qsd']['bcc']):
+ results['bcc'] = NotifySES.parse_list(results['qsd']['bcc'])
+
+ # Handle From Address handling
+ if 'from' in results['qsd'] and len(results['qsd']['from']):
+ results['from_addr'] = \
+ NotifySES.unquote(results['qsd']['from'])
+
+ # Handle Reply To Address
+ if 'reply' in results['qsd'] and len(results['qsd']['reply']):
+ results['reply_to'] = \
+ NotifySES.unquote(results['qsd']['reply'])
+
+ # Handle secret_access_key over-ride
+ if 'secret' in results['qsd'] and len(results['qsd']['secret']):
+ results['secret_access_key'] = \
+ NotifySES.unquote(results['qsd']['secret'])
+ else:
+ results['secret_access_key'] = secret_access_key
+
+ # Handle access key id over-ride
+ if 'access' in results['qsd'] and len(results['qsd']['access']):
+ results['access_key_id'] = \
+ NotifySES.unquote(results['qsd']['access'])
+ else:
+ results['access_key_id'] = access_key_id
+
+ # Handle region name id over-ride
+ if 'region' in results['qsd'] and len(results['qsd']['region']):
+ results['region_name'] = \
+ NotifySES.unquote(results['qsd']['region'])
+ else:
+ results['region_name'] = region_name
+
+ # Return our result set
+ return results
diff --git a/apprise/plugins/NotifySNS.py b/apprise/plugins/NotifySNS.py
index 3cc15a56..1cb6ff97 100644
--- a/apprise/plugins/NotifySNS.py
+++ b/apprise/plugins/NotifySNS.py
@@ -56,7 +56,7 @@ IS_TOPIC = re.compile(r'^#?(?P[A-Za-z0-9_-]+)\s*$')
# users of this product search though this Access Key Secret and escape all
# of the forward slashes!
IS_REGION = re.compile(
- r'^\s*(?P[a-z]{2})-(?P[a-z]+)-(?P[0-9]+)\s*$', re.I)
+ r'^\s*(?P[a-z]{2})-(?P[a-z-]+?)-(?P[0-9]+)\s*$', re.I)
# Extend HTTP Error Messages
AWS_HTTP_ERROR_MAP = {
@@ -116,7 +116,7 @@ class NotifySNS(NotifyBase):
'name': _('Region'),
'type': 'string',
'required': True,
- 'regex': (r'^[a-z]{2}-[a-z]+-[0-9]+$', 'i'),
+ 'regex': (r'^[a-z]{2}-[a-z-]+?-[0-9]+$', 'i'),
'map_to': 'region_name',
},
'target_phone_no': {
@@ -143,6 +143,15 @@ class NotifySNS(NotifyBase):
'to': {
'alias_of': 'targets',
},
+ 'access': {
+ 'alias_of': 'access_key_id',
+ },
+ 'secret': {
+ 'alias_of': 'secret_access_key',
+ },
+ 'region': {
+ 'alias_of': 'region',
+ },
})
def __init__(self, access_key_id, secret_access_key, region_name,
@@ -651,10 +660,26 @@ class NotifySNS(NotifyBase):
results['targets'] += \
NotifySNS.parse_list(results['qsd']['to'])
- # Store our other detected data (if at all)
- results['region_name'] = region_name
- results['access_key_id'] = access_key_id
- results['secret_access_key'] = secret_access_key
+ # Handle secret_access_key over-ride
+ if 'secret' in results['qsd'] and len(results['qsd']['secret']):
+ results['secret_access_key'] = \
+ NotifySNS.unquote(results['qsd']['secret'])
+ else:
+ results['secret_access_key'] = secret_access_key
+
+ # Handle access key id over-ride
+ if 'access' in results['qsd'] and len(results['qsd']['access']):
+ results['access_key_id'] = \
+ NotifySNS.unquote(results['qsd']['access'])
+ else:
+ results['access_key_id'] = access_key_id
+
+ # Handle region name id over-ride
+ if 'region' in results['qsd'] and len(results['qsd']['region']):
+ results['region_name'] = \
+ NotifySNS.unquote(results['qsd']['region'])
+ else:
+ results['region_name'] = region_name
# Return our result set
return results
diff --git a/packaging/redhat/python-apprise.spec b/packaging/redhat/python-apprise.spec
index 51c162ad..5ebc27f7 100644
--- a/packaging/redhat/python-apprise.spec
+++ b/packaging/redhat/python-apprise.spec
@@ -47,15 +47,15 @@ Apprise is a Python package for simplifying access to all of the different
notification services that are out there. Apprise opens the door and makes
it easy to access:
-Apprise API, Boxcar, ClickSend, DingTalk, Discord, E-Mail, Emby, Faast, FCM,
-Flock, Gitter, Google Chat, Gotify, Growl, Home Assistant, IFTTT, Join,
-Kavenegar, KODI, Kumulos, LaMetric, MacOSX, Mailgun, Mattermost, Matrix,
-Microsoft Windows, Microsoft Teams, MessageBird, MQTT, MSG91, MyAndroid, Nexmo,
-Nextcloud, Notica, Notifico, Office365, OneSignal, Opsgenie, ParsePlatform,
-PopcornNotify, Prowl, Pushalot, PushBullet, Pushjet, Pushover, PushSafer,
-Reddit, Rocket.Chat, SendGrid, SimplePush, Sinch, Slack, SMTP2Go, Spontit,
-SparkPost, Super Toasty, Streamlabs, Stride, Syslog, Techulus Push, Telegram,
-Twilio, Twitter, Twist, XBMC, XMPP, Webex Teams}
+Apprise API, AWS SES, AWS SNS, Boxcar, ClickSend, DingTalk, Discord, E-Mail,
+Emby, Faast, FCM, Flock, Gitter, Google Chat, Gotify, Growl, Home Assistant,
+IFTTT, Join, Kavenegar, KODI, Kumulos, LaMetric, MacOSX, Mailgun, Mattermost,
+Matrix, Microsoft Windows, Microsoft Teams, MessageBird, MQTT, MSG91, MyAndroid,
+Nexmo, Nextcloud, Notica, Notifico, Office365, OneSignal, Opsgenie,
+ParsePlatform, PopcornNotify, Prowl, Pushalot, PushBullet, Pushjet, Pushover,
+PushSafer, Reddit, Rocket.Chat, SendGrid, SimplePush, Sinch, Slack, SMTP2Go,
+Spontit, SparkPost, Super Toasty, Streamlabs, Stride, Syslog, Techulus Push,
+Telegram, Twilio, Twitter, Twist, XBMC, XMPP, Webex Teams}
Name: python-%{pypi_name}
Version: 0.9.6
diff --git a/setup.py b/setup.py
index 64b17551..1c846616 100755
--- a/setup.py
+++ b/setup.py
@@ -69,7 +69,7 @@ setup(
long_description_content_type='text/markdown',
cmdclass=cmdclass,
url='https://github.com/caronc/apprise',
- keywords='Push Notifications Alerts Email AWS SNS Boxcar ClickSend '
+ keywords='Push Notifications Alerts Email AWS SES SNS Boxcar ClickSend '
'Dingtalk Discord Dbus Emby Faast FCM Flock Gitter Gnome Google Chat '
'Gotify Growl Home Assistant IFTTT Join Kavenegar KODI Kumulos '
'LaMetric MacOS Mailgun Matrix Mattermost MessageBird MQTT MSG91 '
diff --git a/test/helpers/rest.py b/test/helpers/rest.py
index 1f17342a..71cede28 100644
--- a/test/helpers/rest.py
+++ b/test/helpers/rest.py
@@ -245,8 +245,11 @@ class AppriseURLTester(object):
if privacy_url:
# Assess that our privacy url is as expected
- assert obj.url(
- privacy=True).startswith(privacy_url)
+ if not obj.url(privacy=True).startswith(privacy_url):
+ raise AssertionError(
+ "Privacy URL: '{}' != expected '{}'".format(
+ obj.url(privacy=True)[:len(privacy_url)],
+ privacy_url))
if url_matches:
# Assess that our URL matches a set regex
diff --git a/test/test_plugin_ses.py b/test/test_plugin_ses.py
new file mode 100644
index 00000000..2cfca266
--- /dev/null
+++ b/test/test_plugin_ses.py
@@ -0,0 +1,416 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2021 Chris Caron
+# All rights reserved.
+#
+# This code is licensed under the MIT License.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files(the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions :
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+import os
+import mock
+import pytest
+import requests
+from apprise import Apprise
+from apprise import AppriseAttachment
+from apprise import plugins
+from helpers import AppriseURLTester
+
+# Disable logging for a cleaner testing output
+import logging
+logging.disable(logging.CRITICAL)
+
+# Attachment Directory
+TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
+
+AWS_SES_GOOD_RESPONSE = \
+ """
+
+
+
+ 010f017d87656ee2-a2ea291f-79ea-
+ 44f3-9d25-00d041de3007-000000
+
+
+ 7abb454e-904b-4e46-a23c-2f4d2fc127a6
+
+
+ """
+
+TEST_ACCESS_KEY_ID = 'AHIAJGNT76XIMXDBIJYA'
+TEST_ACCESS_KEY_SECRET = 'bu1dHSdO22pfaaVy/wmNsdljF4C07D3bndi9PQJ9'
+TEST_REGION = 'us-east-2'
+
+# Our Testing URLs
+apprise_url_tests = (
+ ('ses://', {
+ 'instance': TypeError,
+ }),
+ ('ses://:@/', {
+ 'instance': TypeError,
+ }),
+ ('ses://user@example.com/T1JJ3T3L2', {
+ # Just Token 1 provided
+ 'instance': TypeError,
+ }),
+ ('ses://user@example.com/T1JJ3TD4JD/TIiajkdnlazk7FQ/', {
+ # Missing a region
+ 'instance': TypeError,
+ }),
+ ('ses://T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcevi7FQ/us-west-2', {
+ # No email
+ 'instance': TypeError,
+ }),
+ ('ses://user@example.com/T1JJ3TD4JD/TIiajkdnlazk7FQ/'
+ 'user2@example.com', {
+ # Missing a region (but has email)
+ 'instance': TypeError,
+ }),
+ ('ses://user@example.com/T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcevi7FQ/'
+ 'us-west-2?reply=invalid-email', {
+ # An invalid reply-to address
+ 'instance': TypeError,
+
+ # Our response expected server response
+ 'requests_response_text': AWS_SES_GOOD_RESPONSE,
+ }),
+ ('ses://user@example.com/T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcevi7FQ/'
+ 'us-west-2', {
+ # we have a valid URL and we'll use our own email as a target
+ 'instance': plugins.NotifySES,
+
+ # Our response expected server response
+ 'requests_response_text': AWS_SES_GOOD_RESPONSE,
+ }),
+ ('ses://user@example.com/T1JJ3TD4JD/TIiajkdnlazk7FQ/us-west-2/'
+ 'user2@example.ca/user3@example.eu', {
+ # Multi Email Suppport
+ 'instance': plugins.NotifySES,
+
+ # Our response expected server response
+ 'requests_response_text': AWS_SES_GOOD_RESPONSE,
+
+ # Our expected url(privacy=True) startswith() response:
+ 'privacy_url': 'ses://user@example.com/T...D/****/us-west-2',
+ }),
+ ('ses://user@example.com/T1JJ3T3L2/A1BRTD4JD/TIiajkdnlaevi7FQ/us-east-1'
+ '?to=user2@example.ca', {
+ # leveraging to: keyword
+ 'instance': plugins.NotifySES,
+
+ # Our response expected server response
+ 'requests_response_text': AWS_SES_GOOD_RESPONSE,
+ }),
+ ('ses://?from=user@example.com®ion=us-west-2&access=T1JJ3T3L2'
+ '&secret=A1BRTD4JD/TIiajkdnlaevi7FQ'
+ '&reply=No One '
+ '&bcc=user.bcc@example.com,user2.bcc@example.com,invalid-email'
+ '&cc=user.cc@example.com,user2.cc@example.com,invalid-email'
+ '&to=user2@example.ca', {
+ # leveraging a ton of our keywords
+ # We also test invlid emails specified on the bcc and cc list
+ 'instance': plugins.NotifySES,
+
+ # Our response expected server response
+ 'requests_response_text': AWS_SES_GOOD_RESPONSE,
+ }),
+ ('ses://user@example.com/T1JJ3T3L2/A1BRTD4JD/TIiacevi7FQ/us-west-2/'
+ '?name=From%20Name&to=user2@example.ca,invalid-email', {
+ # leveraging a ton of our keywords
+ 'instance': plugins.NotifySES,
+
+ # Our response expected server response
+ 'requests_response_text': AWS_SES_GOOD_RESPONSE,
+ }),
+ ('ses://user@example.com/T1JJ3T3L2/A1BRTD4JD/TIiacevi7FQ/us-west-2/'
+ '?format=text', {
+ # Send email as a text (instead of HTML)
+ 'instance': plugins.NotifySES,
+
+ # Our response expected server response
+ 'requests_response_text': AWS_SES_GOOD_RESPONSE,
+ }),
+ ('ses://user@example.com/T1JJ3T3L2/A1BRTD4JD/TIiacevi7FQ/us-west-2/'
+ '?to=invalid-email', {
+ # An invalid email will get dropped during the initialization
+ # we'll have no targets to notify afterwards
+ 'instance': plugins.NotifySES,
+
+ # Our response expected server response
+ 'requests_response_text': AWS_SES_GOOD_RESPONSE,
+
+ # As a result, we won't be able to notify anyone
+ 'notify_response': False,
+ }),
+ ('ses://user@example.com/T1JJ3T3L2/A1BRTD4JD/TIiacevi7FQ/us-west-2/'
+ 'user2@example.com', {
+ 'instance': plugins.NotifySES,
+ # Our response expected server response
+ 'requests_response_text': AWS_SES_GOOD_RESPONSE,
+ # throw a bizzare code forcing us to fail to look it up
+ 'response': False,
+ 'requests_response_code': 999,
+ }),
+ ('ses://user@example.com/T1JJ3T3L2/A1BRTD4JD/TIiajkdnlavi7FQ/us-west-2/'
+ 'user2@example.com', {
+ 'instance': plugins.NotifySES,
+ # Our response expected server response
+ 'requests_response_text': AWS_SES_GOOD_RESPONSE,
+ # Throws a series of connection and transfer exceptions when this
+ # flag is set and tests that we gracfully handle them
+ 'test_requests_exceptions': True,
+ }),
+)
+
+
+def test_plugin_ses_urls():
+ """
+ NotifySES() Apprise URLs
+
+ """
+
+ # Run our general tests
+ AppriseURLTester(tests=apprise_url_tests).run_all()
+
+
+# We initialize a post object just incase a test fails below
+# we don't want it sending any notifications upstream
+@mock.patch('requests.post')
+def test_plugin_ses_edge_cases(mock_post):
+ """
+ NotifySES() Edge Cases
+
+ """
+
+ # Initializes the plugin with a valid access, but invalid access key
+ with pytest.raises(TypeError):
+ # No access_key_id specified
+ plugins.NotifySES(
+ from_addr="user@example.eu",
+ access_key_id=None,
+ secret_access_key=TEST_ACCESS_KEY_SECRET,
+ region_name=TEST_REGION,
+ targets='user@example.ca',
+ )
+
+ with pytest.raises(TypeError):
+ # No secret_access_key specified
+ plugins.NotifySES(
+ from_addr="user@example.eu",
+ access_key_id=TEST_ACCESS_KEY_ID,
+ secret_access_key=None,
+ region_name=TEST_REGION,
+ targets='user@example.ca',
+ )
+
+ with pytest.raises(TypeError):
+ # No region_name specified
+ plugins.NotifySES(
+ from_addr="user@example.eu",
+ access_key_id=TEST_ACCESS_KEY_ID,
+ secret_access_key=TEST_ACCESS_KEY_SECRET,
+ region_name=None,
+ targets='user@example.ca',
+ )
+
+ # No recipients
+ obj = plugins.NotifySES(
+ from_addr="user@example.eu",
+ access_key_id=TEST_ACCESS_KEY_ID,
+ secret_access_key=TEST_ACCESS_KEY_SECRET,
+ region_name=TEST_REGION,
+ targets=None,
+ )
+
+ # The object initializes properly but would not be able to send anything
+ assert obj.notify(body='test', title='test') is False
+
+ # The phone number is invalid, and without it, there is nothing
+ # to notify; we
+ obj = plugins.NotifySES(
+ from_addr="user@example.eu",
+ access_key_id=TEST_ACCESS_KEY_ID,
+ secret_access_key=TEST_ACCESS_KEY_SECRET,
+ region_name=TEST_REGION,
+ targets='invalid-email',
+ )
+
+ # The object initializes properly but would not be able to send anything
+ assert obj.notify(body='test', title='test') is False
+
+
+def test_plugin_ses_url_parsing():
+ """
+ NotifySES() URL Parsing
+
+ """
+
+ # No recipients
+ results = plugins.NotifySES.parse_url('ses://%s/%s/%s/%s/' % (
+ 'user@example.com',
+ TEST_ACCESS_KEY_ID,
+ TEST_ACCESS_KEY_SECRET,
+ TEST_REGION)
+ )
+
+ # Confirm that there were no recipients found
+ assert len(results['targets']) == 0
+ assert 'region_name' in results
+ assert TEST_REGION == results['region_name']
+ assert 'access_key_id' in results
+ assert TEST_ACCESS_KEY_ID == results['access_key_id']
+ assert 'secret_access_key' in results
+ assert TEST_ACCESS_KEY_SECRET == results['secret_access_key']
+
+ # Detect recipients
+ results = plugins.NotifySES.parse_url('ses://%s/%s/%s/%s/%s/%s/' % (
+ 'user@example.com',
+ TEST_ACCESS_KEY_ID,
+ TEST_ACCESS_KEY_SECRET,
+ # Uppercase Region won't break anything
+ TEST_REGION.upper(),
+ 'user1@example.ca',
+ 'user2@example.eu')
+ )
+
+ # Confirm that our recipients were found
+ assert len(results['targets']) == 2
+ assert 'user1@example.ca' in results['targets']
+ assert 'user2@example.eu' in results['targets']
+ assert 'region_name' in results
+ assert TEST_REGION == results['region_name']
+ assert 'access_key_id' in results
+ assert TEST_ACCESS_KEY_ID == results['access_key_id']
+ assert 'secret_access_key' in results
+ assert TEST_ACCESS_KEY_SECRET == results['secret_access_key']
+
+
+def test_plugin_ses_aws_response_handling():
+ """
+ NotifySES() AWS Response Handling
+
+ """
+ # Not a string
+ response = plugins.NotifySES.aws_response_to_dict(None)
+ assert response['type'] is None
+ assert response['request_id'] is None
+
+ # Invalid XML
+ response = plugins.NotifySES.aws_response_to_dict(
+ '')
+ assert response['type'] is None
+ assert response['request_id'] is None
+
+ # Single Element in XML
+ response = plugins.NotifySES.aws_response_to_dict(
+ '')
+ assert response['type'] == 'SingleElement'
+ assert response['request_id'] is None
+
+ # Empty String
+ response = plugins.NotifySES.aws_response_to_dict('')
+ assert response['type'] is None
+ assert response['request_id'] is None
+
+ response = plugins.NotifySES.aws_response_to_dict(
+ """
+
+
+
+ 010f017d87656ee2-a2ea291f-79ea-44f3-9d25-00d041de307
+
+
+ 7abb454e-904b-4e46-a23c-2f4d2fc127a6
+
+
+ """)
+ assert response['type'] == 'SendRawEmailResponse'
+ assert response['request_id'] == '7abb454e-904b-4e46-a23c-2f4d2fc127a6'
+ assert response['message_id'] == \
+ '010f017d87656ee2-a2ea291f-79ea-44f3-9d25-00d041de307'
+
+ response = plugins.NotifySES.aws_response_to_dict(
+ """
+
+
+ Sender
+ InvalidParameter
+ Invalid parameter
+
+ b5614883-babe-56ca-93b2-1c592ba6191e
+
+ """)
+ assert response['type'] == 'ErrorResponse'
+ assert response['request_id'] == 'b5614883-babe-56ca-93b2-1c592ba6191e'
+ assert response['error_type'] == 'Sender'
+ assert response['error_code'] == 'InvalidParameter'
+ assert response['error_message'] == ('Invalid parameter')
+
+
+@mock.patch('requests.post')
+def test_plugin_ses_attachments(mock_post):
+ """
+ NotifySES() Attachment Checks
+
+ """
+ # Disable Throttling to speed testing
+ plugins.NotifySES.request_rate_per_sec = 0
+
+ # Prepare Mock return object
+ response = mock.Mock()
+ response.content = AWS_SES_GOOD_RESPONSE
+ response.status_code = requests.codes.ok
+ mock_post.return_value = response
+
+ # prepare our attachment
+ attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
+
+ # Test our markdown
+ obj = Apprise.instantiate('ses://%s/%s/%s/%s/' % (
+ 'user@example.com',
+ TEST_ACCESS_KEY_ID,
+ TEST_ACCESS_KEY_SECRET,
+ TEST_REGION)
+ )
+
+ # Send a good attachment
+ assert obj.notify(body="test", attach=attach) is True
+
+ # Reset our mock object
+ mock_post.reset_mock()
+
+ # Add another attachment so we drop into the area of the PushBullet code
+ # that sends remaining attachments (if more detected)
+ attach.add(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
+
+ # Send our attachments
+ assert obj.notify(body="test", attach=attach) is True
+
+ # Test our call count
+ assert mock_post.call_count == 1
+
+ # Reset our mock object
+ mock_post.reset_mock()
+
+ # An invalid attachment will cause a failure
+ path = os.path.join(TEST_VAR_DIR, '/invalid/path/to/an/invalid/file.jpg')
+ attach = AppriseAttachment(path)
+ assert obj.notify(body="test", attach=attach) is False
diff --git a/test/test_plugin_sns.py b/test/test_plugin_sns.py
index 01187abf..f06ca80c 100644
--- a/test/test_plugin_sns.py
+++ b/test/test_plugin_sns.py
@@ -59,6 +59,11 @@ apprise_url_tests = (
# we have a valid URL and one number to text
'instance': plugins.NotifySNS,
}),
+ ('sns://?access=T1JJ3T3L2&secret=A1BRTD4JD/TIiajkdnlazkcevi7FQ'
+ '®ion=us-west-2&to=12223334444', {
+ # Initialize using get parameters instead
+ 'instance': plugins.NotifySNS,
+ }),
('sns://T1JJ3TD4JD/TIiajkdnlazk7FQ/us-west-2/12223334444/12223334445', {
# Multi SNS Suppport
'instance': plugins.NotifySNS,