diff --git a/apprise/plugins/NotifyBase.py b/apprise/plugins/NotifyBase.py index 53ec9c7c..19650d60 100644 --- a/apprise/plugins/NotifyBase.py +++ b/apprise/plugins/NotifyBase.py @@ -33,6 +33,7 @@ except ImportError: from ..utils import parse_url from ..utils import parse_bool +from ..utils import is_hostname from ..common import NOTIFY_IMAGE_SIZES from ..common import NOTIFY_TYPES @@ -80,8 +81,8 @@ NOTIFY_FORMATS = ( # Regular expression retrieved from: # http://www.regular-expressions.info/email.html IS_EMAIL_RE = re.compile( - r"(?P[a-z0-9!#$%&'*+/=?^_`{|}~-]+" - r"(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)" + r"(?P[a-z0-9$%+=_~-]+" + r"(?:\.[a-z0-9$%+=_~-]+)" r"*)@(?P(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+" r"[a-z0-9](?:[a-z0-9-]*" r"[a-z0-9]))?", @@ -312,6 +313,14 @@ class NotifyBase(object): """ return IS_EMAIL_RE.match(address) is not None + @staticmethod + def is_hostname(hostname): + """ + Returns True if specified entry is a hostname + + """ + return is_hostname(hostname) + @staticmethod def parse_url(url): """ diff --git a/apprise/plugins/NotifyEmail.py b/apprise/plugins/NotifyEmail.py index c399a768..84b3b1b0 100644 --- a/apprise/plugins/NotifyEmail.py +++ b/apprise/plugins/NotifyEmail.py @@ -19,13 +19,11 @@ import re from datetime import datetime -from smtplib import SMTP -from smtplib import SMTPException +import smtplib from socket import error as SocketError from email.mime.text import MIMEText -from ..utils import compat_is_basestring from .NotifyBase import NotifyBase from .NotifyBase import NotifyFormat @@ -49,7 +47,7 @@ WEBBASE_LOOKUP_TABLE = ( # Google GMail ( 'Google Mail', - re.compile('^(?P[^@]+)@(?Pgmail\.com)$', re.I), + re.compile(r'^(?P[^@]+)@(?Pgmail\.com)$', re.I), { 'port': 587, 'smtp_host': 'smtp.gmail.com', @@ -61,7 +59,7 @@ WEBBASE_LOOKUP_TABLE = ( # Pronto Mail ( 'Pronto Mail', - re.compile('^(?P[^@]+)@(?Pprontomail\.com)$', re.I), + re.compile(r'^(?P[^@]+)@(?Pprontomail\.com)$', re.I), { 'port': 465, 'smtp_host': 'secure.emailsrvr.com', @@ -73,7 +71,7 @@ WEBBASE_LOOKUP_TABLE = ( # Microsoft Hotmail ( 'Microsoft Hotmail', - re.compile('^(?P[^@]+)@(?P(hotmail|live)\.com)$', re.I), + re.compile(r'^(?P[^@]+)@(?P(hotmail|live)\.com)$', re.I), { 'port': 587, 'smtp_host': 'smtp.live.com', @@ -85,7 +83,7 @@ WEBBASE_LOOKUP_TABLE = ( # Yahoo Mail ( 'Yahoo Mail', - re.compile('^(?P[^@]+)@(?Pyahoo\.(ca|com))$', re.I), + re.compile(r'^(?P[^@]+)@(?Pyahoo\.(ca|com))$', re.I), { 'port': 465, 'smtp_host': 'smtp.mail.yahoo.com', @@ -97,7 +95,7 @@ WEBBASE_LOOKUP_TABLE = ( # Catch All ( 'Custom', - re.compile('^(?P[^@]+)@(?P.+)$', re.I), + re.compile(r'^(?P[^@]+)@(?P.+)$', re.I), { # Setting smtp_host to None is a way of # auto-detecting it based on other parameters @@ -159,28 +157,19 @@ class NotifyEmail(NotifyBase): # Now we want to construct the To and From email # addresses from the URL provided - self.from_name = kwargs.get('name', 'NZB Notification') + self.from_name = kwargs.get('name', NotifyBase.app_desc) self.from_addr = kwargs.get('from', None) - if not self.from_addr: - # Keep trying to be clever and make it equal to the to address - self.from_addr = self.to_addr - - if not compat_is_basestring(self.to_addr): - raise TypeError('No valid ~To~ email address specified.') if not NotifyBase.is_email(self.to_addr): raise TypeError('Invalid ~To~ email format: %s' % self.to_addr) - if not compat_is_basestring(self.from_addr): - raise TypeError('No valid ~From~ email address specified.') - match = NotifyBase.is_email(self.from_addr) if not match: # Parse Source domain based on from_addr raise TypeError('Invalid ~From~ email format: %s' % self.to_addr) # Now detect the SMTP Server - self.smtp_host = kwargs.get('smtp_host', None) + self.smtp_host = kwargs.get('smtp_host', '') # Apply any defaults based on certain known configurations self.NotifyEmailDefaults() @@ -269,7 +258,7 @@ class NotifyEmail(NotifyBase): try: self.logger.debug('Connecting to remote SMTP server...') - socket = SMTP( + socket = smtplib.SMTP( self.smtp_host, self.port, None, @@ -293,7 +282,7 @@ class NotifyEmail(NotifyBase): self.to_addr, )) - except (SocketError, SMTPException) as e: + except (SocketError, smtplib.SMTPException, RuntimeError) as e: self.logger.warning( 'A Connection error occured sending Email ' 'notification to %s.' % self.smtp_host) @@ -334,87 +323,51 @@ class NotifyEmail(NotifyBase): if 'format' in results['qsd'] and len(results['qsd']['format']): # Extract email format (Text/Html) + format = NotifyBase.unquote(results['qsd']['format']).lower() + if len(format) > 0 and format[0] == 't': + results['notify_format'] = NotifyFormat.TEXT + + if 'to' in results['qsd'] and len(results['qsd']['to']): + to_addr = NotifyBase.unquote(results['qsd']['to']).strip() + + else: + # get 'To' email address try: - format = NotifyBase.unquote(results['qsd']['format']).lower() - if len(format) > 0 and format[0] == 't': - results['notify_format'] = NotifyFormat.TEXT - - except AttributeError: - pass - - # get 'To' email address - try: - to_addr = filter(bool, NotifyBase.split_path(results['host']))[0] - - except (AttributeError, IndexError): - # No problem, we have other ways of getting - # the To address - pass - - if not NotifyBase.is_email(to_addr): - if results['user']: - # Try to be clever and build a potential - # email address based on what we've been provided to_addr = '%s@%s' % ( - re.split('[\s@]+', results['user'])[0], - re.split('[\s@]+', to_addr)[-1], + re.split( + '[\s@]+', NotifyBase.unquote(results['user']))[0], + results.get('host', '') ) - if not NotifyBase.is_email(to_addr): - NotifyBase.logger.error( - '%s does not contain a recipient email.' % - NotifyBase.unquote(results['url'].lstrip('/')), - ) - return None + except (AttributeError, IndexError): + # No problem, we have other ways of getting + # the To address + pass # Attempt to detect 'from' email address from_addr = to_addr - try: - if 'from' in results['qsd'] and len(results['qsd']['from']): - from_addr = results['qsd']['from'] - if not NotifyBase.is_email(results['qsd']['from']): - # Lets be clever and attempt to make the from - # address email - from_addr = '%s@%s' % ( - re.split('[\s@]+', from_addr)[0], - re.split('[\s@]+', to_addr)[-1], - ) + if 'from' in results['qsd'] and len(results['qsd']['from']): + from_addr = NotifyBase.unquote(results['qsd']['from']) + if not NotifyBase.is_email(from_addr): + # Lets be clever and attempt to make the from + # address an email based on the to address + from_addr = '%s@%s' % ( + re.split('[\s@]+', from_addr)[0], + re.split('[\s@]+', to_addr)[-1], + ) - if not NotifyBase.is_email(from_addr): - NotifyBase.logger.error( - '%s does not contain a from address.' % - NotifyBase.unquote(results['url'].lstrip('/')), - ) - return None + if 'name' in results['qsd'] and len(results['qsd']['name']): + # Extract from name to associate with from address + results['name'] = NotifyBase.unquote(results['qsd']['name']) - except AttributeError: - pass - - try: - if 'name' in results['qsd'] and len(results['qsd']['name']): - # Extract from name to associate with from address - results['name'] = NotifyBase.unquote(results['qsd']['name']) - - except AttributeError: - pass - - try: - if 'timeout' in results['qsd'] and len(results['qsd']['timeout']): - # Extract the timeout to associate with smtp server - results['timeout'] = NotifyBase.unquote( - results['qsd']['timeout']) - - except AttributeError: - pass + if 'timeout' in results['qsd'] and len(results['qsd']['timeout']): + # Extract the timeout to associate with smtp server + results['timeout'] = results['qsd']['timeout'] # Store SMTP Host if specified - try: - # Extract from password to associate with smtp server - if 'smtp' in results['qsd'] and len(results['qsd']['smtp']): - smtp_host = NotifyBase.unquote(results['qsd']['smtp']) - - except AttributeError: - pass + if 'smtp' in results['qsd'] and len(results['qsd']['smtp']): + # Extract the smtp server + smtp_host = NotifyBase.unquote(results['qsd']['smtp']) results['to'] = to_addr results['from'] = from_addr diff --git a/apprise/plugins/__init__.py b/apprise/plugins/__init__.py index 82e4f282..17ff54c0 100644 --- a/apprise/plugins/__init__.py +++ b/apprise/plugins/__init__.py @@ -16,6 +16,10 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. +# Used for Testing; specifically test_email_plugin.py needs access +# to the modules WEBBASE_LOOKUP_TABLE and WebBaseLogin objects +from . import NotifyEmail as NotifyEmailBase + from .NotifyBoxcar import NotifyBoxcar from .NotifyEmail import NotifyEmail from .NotifyFaast import NotifyFaast @@ -52,4 +56,7 @@ __all__ = [ # Reference 'NotifyImageSize', 'NOTIFY_IMAGE_SIZES', 'NotifyType', 'NOTIFY_TYPES', + + # NotifyEmail Base References (used for Testing) + 'NotifyEmailBase', ] diff --git a/apprise/utils.py b/apprise/utils.py index 87f10349..b567cd4b 100644 --- a/apprise/utils.py +++ b/apprise/utils.py @@ -85,6 +85,20 @@ TIDY_NUX_TRIM_RE = re.compile( ) +def is_hostname(hostname): + """ + Validate hostname + """ + if len(hostname) > 255 or len(hostname) == 0: + return False + + if hostname[-1] == ".": + hostname = hostname[:-1] + + allowed = re.compile("(?!-)[A-Z\d_-]{1,63}(? +# +# This file is part of apprise. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. + +from apprise import plugins +from apprise import NotifyType +from apprise import Apprise +import smtplib +import mock +import re + + +VALID_URLS = ( + ################################## + # NotifyEmail + ################################## + ('mailto://', { + 'instance': None, + }), + ('mailtos://', { + 'instance': None, + }), + ('mailto://:@/', { + 'instance': None + }), + # No Username + ('mailtos://:pass@nuxref.com:567', { + # Can't prepare a To address using this expression + 'exception': TypeError, + }), + + # Pre-Configured Email Services + ('mailto://user:pass@gmail.com', { + 'instance': plugins.NotifyEmail, + }), + ('mailto://user:pass@hotmail.com', { + 'instance': plugins.NotifyEmail, + }), + ('mailto://user:pass@live.com', { + 'instance': plugins.NotifyEmail, + }), + ('mailto://user:pass@prontomail.com', { + 'instance': plugins.NotifyEmail, + }), + ('mailto://user:pass@yahoo.com', { + 'instance': plugins.NotifyEmail, + }), + ('mailto://user:pass@yahoo.ca', { + 'instance': plugins.NotifyEmail, + }), + + # Custom Emails + ('mailtos://user:pass@nuxref.com:567', { + 'instance': plugins.NotifyEmail, + }), + ('mailto://user:pass@nuxref.com:567?format=html', { + 'instance': plugins.NotifyEmail, + }), + ('mailtos://user:pass@nuxref.com:567?to=l2g@nuxref.com', { + 'instance': plugins.NotifyEmail, + }), + ( + 'mailtos://user:pass@example.com?smtp=smtp.example.com&timeout=5' + '&name=l2g&from=noreply@example.com', { + 'instance': plugins.NotifyEmail, + }, + ), + ('mailto://user:pass@example.com?timeout=invalid.entry', { + 'instance': plugins.NotifyEmail, + }), + ('mailto://user:pass@example.com?timeout=invalid.entry', { + 'instance': plugins.NotifyEmail, + }), + ( + 'mailto://user:pass@example.com:2525?user=l2g@example.com' + '&pass=l2g@apprise!is!Awesome', { + 'instance': plugins.NotifyEmail, + }, + ), + ( + 'mailto://user:pass@example.com:2525?user=l2g@example.com' + '&pass=l2g@apprise!is!Awesome&format=text', { + 'instance': plugins.NotifyEmail, + }, + ), + # No Password + ('mailtos://user:@nuxref.com', { + 'instance': plugins.NotifyEmail, + }), + # Invalid From Address (falls back to using To Address) + ('mailtos://user:pass@nuxref.com?from=@', { + 'exception': TypeError, + }), + # Invalid To Address + ('mailtos://nuxref.com?user=&pass=.', { + 'exception': TypeError, + }), + # Valid URL, but can't structure a proper email + ('mailtos://nuxref.com?user=%20!&pass=.', { + 'exception': TypeError, + }), + # Invalid To Address + ('mailtos://nuxref.com?to=test', { + 'exception': TypeError, + }), + # Can make a To address using what we have (l2g@nuxref.com) + ('mailtos://nuxref.com?user=l2g&pass=.', { + 'instance': plugins.NotifyEmail, + }), + ('mailto://user:pass@localhost:2525', { + 'instance': plugins.NotifyEmail, + # Throws a series of connection and transfer exceptions when this flag + # is set and tests that we gracfully handle them + 'test_smtplib_exceptions': True, + }), +) + + +@mock.patch('smtplib.SMTP') +def test_email_plugin(mock_smtp): + """ + API: NotifyEmail Plugin() + + """ + + # iterate over our dictionary and test it out + for (url, meta) in VALID_URLS: + + # Our expected instance + instance = meta.get('instance', None) + + # Our expected exception + exception = meta.get('exception', None) + + # Our expected server objects + self = meta.get('self', None) + + # Our expected Query response (True, False, or exception type) + response = meta.get('response', True) + + # Allow us to force the server response code to be something other then + # the defaults + smtplib_response_code = meta.get( + 'smtplib_response_code', 200 if response else 404) + + test_smtplib_exceptions = meta.get( + 'test_smtplib_exceptions', False) + + # Our mock of our socket action + mock_socket = mock.Mock() + mock_socket.starttls.return_value = True + mock_socket.login.return_value = True + + # Create a mock SMTP Object + mock_smtp.return_value = mock_socket + + if test_smtplib_exceptions is False: + pass + # Handle our default response + mock_socket.sendmail.return_value = smtplib_response_code + # mock_post.return_value.status_code = smtplib_response_code + # mock_get.return_value.status_code = smtplib_response_code + # mock_post.side_effect = None + # mock_get.side_effect = None + + else: + # Handle exception testing; first we turn the boolean flag ito + # a list of exceptions + test_smtplib_exceptions = ( + smtplib.SMTPHeloError( + 0, 'smtplib.SMTPHeloError() not handled'), + smtplib.SMTPException( + 0, 'smtplib.SMTPException() not handled'), + RuntimeError( + 0, 'smtplib.HTTPError() not handled'), + smtplib.SMTPRecipientsRefused( + 'smtplib.SMTPRecipientsRefused() not handled'), + smtplib.SMTPSenderRefused( + 0, 'smtplib.SMTPSenderRefused() not handled', + 'addr@example.com'), + smtplib.SMTPDataError( + 0, 'smtplib.SMTPDataError() not handled'), + smtplib.SMTPServerDisconnected( + 'smtplib.SMTPServerDisconnected() not handled'), + ) + + try: + obj = Apprise.instantiate(url, suppress_exceptions=False) + + assert(exception is None) + + if obj is None: + # We're done + continue + + if instance is None: + # Expected None but didn't get it + print('%s instantiated %s' % (url, str(obj))) + assert(False) + + assert(isinstance(obj, instance)) + + if self: + # Iterate over our expected entries inside of our object + for key, val in self.items(): + # Test that our object has the desired key + assert(hasattr(key, obj)) + assert(getattr(key, obj) == val) + + try: + if test_smtplib_exceptions is False: + # check that we're as expected + assert obj.notify( + title='test', body='body', + notify_type=NotifyType.INFO) == response + + else: + for exception in test_smtplib_exceptions: + mock_socket.sendmail.side_effect = exception + try: + assert obj.notify( + title='test', body='body', + notify_type=NotifyType.INFO) is False + + except AssertionError: + # Don't mess with these entries + raise + + except Exception as e: + # We can't handle this exception type + print('%s / %s' % (url, str(e))) + assert False + + except AssertionError: + # Don't mess with these entries + raise + + except Exception as e: + # Check that we were expecting this exception to happen + assert isinstance(e, response) + + except AssertionError: + # Don't mess with these entries + print('%s AssertionError' % url) + raise + + except Exception as e: + # Handle our exception + print('%s / %s' % (url, str(e))) + assert(exception is not None) + assert(isinstance(e, exception)) + + +@mock.patch('smtplib.SMTP') +def test_webbase_lookup(mock_smtp): + """ + API: Web Based Lookup Tests + + """ + + from apprise.plugins import NotifyEmailBase + + # Insert a test email at the head of our table + NotifyEmailBase.WEBBASE_LOOKUP_TABLE = ( + ( + # Testing URL + 'Testing Lookup', + re.compile(r'^(?P[^@]+)@(?Pl2g\.com)$', re.I), + { + 'port': 123, + 'smtp_host': 'smtp.l2g.com', + 'secure': True, + 'login_type': (NotifyEmailBase.WebBaseLogin.USERID, ) + }, + ), + ) + NotifyEmailBase.WEBBASE_LOOKUP_TABLE + + obj = Apprise.instantiate( + 'mailto://user:pass@l2g.com', suppress_exceptions=True) + + assert(isinstance(obj, plugins.NotifyEmail)) + assert obj.to_addr == 'user@l2g.com' + assert obj.from_addr == 'user@l2g.com' + assert obj.password == 'pass' + assert obj.user == 'user' + assert obj.secure is True + assert obj.port == 123 + assert obj.smtp_host == 'smtp.l2g.com' diff --git a/test/test_notifybase.py b/test/test_notify_base.py similarity index 93% rename from test/test_notifybase.py rename to test/test_notify_base.py index 1c836b9e..cc0ea52f 100644 --- a/test/test_notifybase.py +++ b/test/test_notify_base.py @@ -156,3 +156,11 @@ def test_notify_base_urls(): 'https://user:pass@localhost?user=newuser') assert 'user' in results assert results['user'] == "newuser" + + # Test invalid urls + assert NotifyBase.parse_url('https://:@/') is None + assert NotifyBase.parse_url('http://:@') is None + assert NotifyBase.parse_url('http://@') is None + assert NotifyBase.parse_url('http:///') is None + assert NotifyBase.parse_url('http://:test/') is None + assert NotifyBase.parse_url('http://pass:test/') is None diff --git a/test/test_plugins.py b/test/test_rest_plugins.py similarity index 92% rename from test/test_plugins.py rename to test/test_rest_plugins.py index c98e2a2d..4b1376a7 100644 --- a/test/test_plugins.py +++ b/test/test_rest_plugins.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# NotifyJSON Unit Tests +# REST Based Plugins - Unit Tests # # Copyright (C) 2017 Chris Caron # @@ -57,6 +57,9 @@ VALID_URLS = ( ('jsons://user:pass@localhost:8080', { 'instance': plugins.NotifyJSON, }), + ('json://:@/', { + 'instance': None, + }), ('json://user:pass@localhost:8081', { 'instance': plugins.NotifyJSON, # force a failure @@ -113,6 +116,9 @@ VALID_URLS = ( # Thrown because the webhook is not in a valid format 'exception': TypeError, }), + ('mmost://:@/', { + 'instance': None, + }), ('mmost://localhost/3ccdd113474722377935511fc85d3dd4', { 'instance': plugins.NotifyMatterMost, # force a failure @@ -136,9 +142,9 @@ VALID_URLS = ( @mock.patch('requests.get') @mock.patch('requests.post') -def test_plugins(mock_post, mock_get): +def test_rest_plugins(mock_post, mock_get): """ - API: Plugins() object + API: REST Based Plugins() """ @@ -199,6 +205,11 @@ def test_plugins(mock_post, mock_get): # We're done continue + if instance is None: + # Expected None but didn't get it + print('%s instantiated %s' % (url, str(obj))) + assert(False) + assert(isinstance(obj, instance)) if self: @@ -210,7 +221,7 @@ def test_plugins(mock_post, mock_get): try: if test_requests_exceptions is False: - # check tht we're as expected + # check that we're as expected assert obj.notify( title='test', body='body', notify_type=NotifyType.INFO) == response @@ -230,10 +241,12 @@ def test_plugins(mock_post, mock_get): except Exception as e: # We can't handle this exception type + print('%s / %s' % (url, str(e))) assert False except AssertionError: # Don't mess with these entries + print('%s AssertionError' % url) raise except Exception as e: @@ -242,9 +255,11 @@ def test_plugins(mock_post, mock_get): except AssertionError: # Don't mess with these entries + print('%s / %s' % (url, str(e))) raise except Exception as e: # Handle our exception + print('%s / %s' % (url, str(e))) assert(exception is not None) assert(isinstance(e, exception)) diff --git a/test/test_utils.py b/test/test_utils.py index 8d14dd8f..7b07ee54 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -264,6 +264,24 @@ def test_parse_bool(): assert(utils.parse_bool('OhYeah', True) is True) +def test_is_hostname(): + """ + API: is_hostname() function + + """ + # Valid Hostnames + assert utils.is_hostname('yahoo.ca') is True + assert utils.is_hostname('yahoo.ca.') is True + assert utils.is_hostname('valid-dashes-in-host.ca') is True + assert utils.is_hostname('valid-underscores_in_host.ca') is True + + # Invalid Hostnames + assert utils.is_hostname('invalid-characters_#^.ca') is False + assert utils.is_hostname(' spaces ') is False + assert utils.is_hostname(' ') is False + assert utils.is_hostname('') is False + + def test_parse_list(): "utils: parse_list() testing """