From e8b2d4221d2ac6a43188c3577a4974fe851d3df0 Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Mon, 16 Sep 2024 21:50:20 -0400 Subject: [PATCH] Support for PGP Email Support --- all-plugin-requirements.txt | 3 + apprise/plugins/email.py | 275 ++++++++++++++++++++++++++++++++++-- test/test_plugin_email.py | 76 +++++++++- 3 files changed, 340 insertions(+), 14 deletions(-) diff --git a/all-plugin-requirements.txt b/all-plugin-requirements.txt index b2d0fc56..5d70bb40 100644 --- a/all-plugin-requirements.txt +++ b/all-plugin-requirements.txt @@ -11,3 +11,6 @@ gntp # Provides mqtt:// support # use v1.x due to https://github.com/eclipse/paho.mqtt.python/issues/814 paho-mqtt < 2.0.0 + +# Pretty Good Privacy (PGP) Provides mailto:// and deltachat:// support +PGPy diff --git a/apprise/plugins/email.py b/apprise/plugins/email.py index 2e423916..89eb1811 100644 --- a/apprise/plugins/email.py +++ b/apprise/plugins/email.py @@ -27,6 +27,7 @@ # POSSIBILITY OF SUCH DAMAGE. import dataclasses +import os import re import smtplib import typing as t @@ -36,19 +37,30 @@ from email.mime.multipart import MIMEMultipart from email.utils import formataddr, make_msgid from email.header import Header from email import charset +import hashlib from socket import error as SocketError from datetime import datetime +from datetime import timedelta from datetime import timezone from .base import NotifyBase from ..url import PrivacyMode -from ..common import NotifyFormat, NotifyType +from ..common import NotifyFormat, NotifyType, PersistentStoreMode from ..conversion import convert_between -from ..utils import is_ipaddr, is_email, parse_emails, is_hostname +from ..utils import is_ipaddr, is_email, parse_emails, is_hostname, parse_bool from ..locale import gettext_lazy as _ from ..logger import logger +try: + import pgpy + # Pretty Good Privacy (PGP) Support enabled + PGP_SUPPORT = True + +except ImportError: + # Pretty Good Privacy (PGP) Support disabled + PGP_SUPPORT = False + # Globally Default encoding mode set to Quoted Printable. charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8') @@ -439,6 +451,12 @@ class NotifyEmail(NotifyBase): 'type': 'string', 'map_to': 'smtp_host', }, + 'pgp': { + 'name': _('PGP Encryption'), + 'type': 'bool', + 'map_to': 'use_pgp', + 'default': False, + }, 'mode': { 'name': _('Secure Mode'), 'type': 'choice:string', @@ -463,7 +481,7 @@ class NotifyEmail(NotifyBase): def __init__(self, smtp_host=None, from_addr=None, secure_mode=None, targets=None, cc=None, bcc=None, reply_to=None, headers=None, - **kwargs): + use_pgp=None, **kwargs): """ Initialize Email Object @@ -500,6 +518,17 @@ class NotifyEmail(NotifyBase): self.smtp_host = \ smtp_host if isinstance(smtp_host, str) else '' + # pgp hash + self.pgp_public_keys = {} + + self.use_pgp = use_pgp if not None \ + else self.template_args['pgp']['default'] + + if self.use_pgp and not PGP_SUPPORT: + self.logger.warning( + 'PGP Support is not available on this installation; ' + 'ask admin to install PGPy') + # Now detect secure mode if secure_mode: self.secure_mode = None \ @@ -831,6 +860,12 @@ class NotifyEmail(NotifyBase): mixed.attach(app) base = mixed + if self.use_pgp: + # Apply our encryption + encrypted_content = self.pgp_encrypt_message(base.as_string()) + if encrypted_content: + base = MIMEText(encrypted_content, "plain") + # Apply any provided custom headers for k, v in self.headers.items(): base[k] = Header(v, self._get_charset(v)) @@ -901,20 +936,21 @@ class NotifyEmail(NotifyBase): message.to_addrs, message.body) - self.logger.info( - f'Sent Email notification to "{message.recipient}".') + self.logger.info('Sent Email to %s', message.recipient) + except (SocketError, smtplib.SMTPException, RuntimeError) as e: self.logger.warning( - f'Sending email to "{message.recipient}" failed. ' - f'Reason: {e}') + 'Sending email to "%s" failed.', message.recipient) + self.logger.debug(f'Socket Exception: {e}') # Mark as failure has_error = True except (SocketError, smtplib.SMTPException, RuntimeError) as e: self.logger.warning( - f'Connection error while submitting email to {self.smtp_host}.' - f' Reason: {e}') + 'Connection error while submitting email to "%s"', + self.smtp_host) + self.logger.debug(f'Socket Exception: {e}') # Mark as failure has_error = True @@ -924,15 +960,224 @@ class NotifyEmail(NotifyBase): if socket is not None: # pragma: no branch socket.quit() + # Reduce our dictionary (eliminate expired keys if any) + self.pgp_public_keys = { + key: value for key, value in self.pgp_public_keys.items() + if value['expires'] > datetime.now(timezone.utc)} + return not has_error + def pgp_generate_keys(self, path=None): + """ + Generates a set of keys based on email configured + """ + if path is None: + if self.store.mode == PersistentStoreMode.MEMORY: + # Not possible - no write permissions + return False + + # Set our path + path = self.store.path + + try: + # Create a new RSA key pair with 2048-bit strength + key = pgpy.PGPKey.new( + pgpy.constants.PubKeyAlgorithm.RSAEncryptOrSign, 2048) + + except NameError: + # PGPy not installed + self.logger.debug('PGPy not installed; ignoring PGP file: %s') + return False + + # Prepare our uid + name, email = self.names[self.from_addr[1]], self.from_addr[1] + uid = pgpy.PGPUID.new(name, email=email) + + # Filenames + file_prefix = email.split('@')[0].lower() + pub_path = os.path.join(path, f'{file_prefix}-pub.asc') + prv_path = os.path.join(path, f'{file_prefix}-prv.asc') + + # Add the user ID to the key + key.add_uid(uid, usage={ + pgpy.constants.KeyFlags.Sign, + pgpy.constants.KeyFlags.EncryptCommunications}, + hashes=[pgpy.constants.HashAlgorithm.SHA256], + ciphers=[pgpy.constants.SymmetricKeyAlgorithm.AES256], + compression=[pgpy.constants.CompressionAlgorithm.ZLIB]) + + try: + # Write our keys to disk + with open(pub_path, 'w') as f: + f.write(str(key.pubkey)) + + except OSError as e: + self.logger.warning('Error writing PGP file %s', pub_path) + self.logger.debug(f'I/O Exception: {e}') + + # Cleanup + try: + os.unlink(pub_path) + self.logger.trace('Removed %s', pub_path) + + except OSError: + pass + + try: + + with open(prv_path, 'w') as f: + f.write(str(key)) + + except OSError as e: + self.logger.warning('Error writing PGP file %s', prv_path) + self.logger.debug(f'I/O Exception: {e}') + try: + os.unlink(pub_path) + self.logger.trace('Removed %s', pub_path) + + except OSError: + pass + + try: + os.unlink(prv_path) + self.logger.trace('Removed %s', prv_path) + + except OSError: + pass + + return False + + self.logger.info( + 'Wrote PGP Keys for %s/%s', + os.path.dirname(pub_path), + os.path.basename(pub_path)) + return True + + @property + def pgp_fnames(self): + """ + Returns a list of filenames worth scanning for + """ + fnames = [ + 'pgp-public.asc', + 'pgp-pub.asc', + 'public.asc', + 'pub.asc', + ] + + # Prepare our key files: + email = self.from_addr[1] + + _entry = email.split('@')[0].lower() + if _entry not in fnames: + fnames.insert(0, f'{_entry}-pub.asc') + + # Lowercase email (Highest Priority) + _entry = email.lower() + if _entry not in fnames: + fnames.insert(0, f'{_entry}-pub.asc') + + return fnames + + def pgp_public_key(self, path=None): + """ + Opens a spcified pgp public file and returns the key from it which + is used to encrypt the message + """ + if path is None: + path = next( + (os.path.join(self.store.path, fname) + for fname in self.pgp_fnames + if os.path.isfile(os.path.join(self.store.path, fname))), + None) + + if not path: + if self.pgp_generate_keys(path=self.store.path): + path = next( + (os.path.join(self.store.path, fname) + for fname in self.pgp_fnames + if os.path.isfile( + os.path.join(self.store.path, fname))), None) + + if path: + # We should get a hit now + return self.pgp_public_key(path=path) + + self.logger.warning('No PGP Public Key could be loaded') + return None + + if not isinstance(path, str): + raise AttributeError( + 'Invalid path to PGP Public Key specified: %s: %s', + type(path), str(path)) + + # Persistent storage key: + ps_key = hashlib.sha1( + os.path.abspath(path).encode('utf-8')).hexdigest() + if ps_key in self.pgp_public_keys: + # Take an early exit + return self.pgp_public_keys[ps_key]['public_key'] + + try: + with open(path, 'r') as key_file: + public_key, _ = pgpy.PGPKey.from_blob(key_file.read()) + + except NameError: + # PGPy not installed + self.logger.debug( + 'PGPy not installed; skipping PGP support: %s', path) + return None + + except FileNotFoundError: + # Generate keys + self.logger.debug('PGP Public Key file not found: %s', path) + return None + + except OSError as e: + self.logger.warning('Error accessing PGP Public Key file %s', path) + self.logger.debug(f'I/O Exception: {e}') + return None + + self.store.set(ps_key, public_key, expires=86400) + self.pgp_public_keys[ps_key] = { + 'public_key': public_key, + 'expires': + datetime.now(timezone.utc) + timedelta(seconds=86400) + } + return public_key + + # Encrypt message using the recipient's public key + def pgp_encrypt_message(self, message, path=None): + """ + If provided a path to a pgp-key, content is encrypted + """ + + # Acquire our key + public_key = self.pgp_public_key(path=path) + if not public_key: + # Encryption not possible + return False + + try: + message_object = pgpy.PGPMessage.new(message) + encrypted_message = public_key.encrypt(message_object) + return str(encrypted_message) + + except NameError: + # PGPy not installed + self.logger.debug('PGPy not installed; Skipping PGP encryption') + + return None + def url(self, privacy=False, *args, **kwargs): """ Returns the URL built dynamically based on specified arguments. """ # Define an URL parameters - params = {} + params = { + 'pgp': 'yes' if self.use_pgp else 'no', + } # Append our headers into our parameters params.update({'+{}'.format(k): v for k, v in self.headers.items()}) @@ -1044,7 +1289,7 @@ class NotifyEmail(NotifyBase): """ return ( self.secure_protocol if self.secure else self.protocol, - self.user, self.password, self.host, + self.user, self.password, self.host, self.smtp_host, self.port if self.port else SECURE_MODES[self.secure_mode]['default_port'], ) @@ -1053,8 +1298,7 @@ class NotifyEmail(NotifyBase): """ Returns the number of targets associated with this notification """ - targets = len(self.targets) - return targets if targets > 0 else 1 + return len(self.targets) if self.targets else 1 @staticmethod def parse_url(url): @@ -1086,6 +1330,11 @@ class NotifyEmail(NotifyBase): # value if invalid; we'll attempt to figure this out later on results['host'] = '' + # Get PGP Flag + results['use_pgp'] = \ + parse_bool(results['qsd'].get( + 'pgp', NotifyEmail.template_args['pgp']['default'])) + # The From address is a must; either through the use of templates # from= entry and/or merging the user and hostname together, this # must be calculated or parse_url will fail. diff --git a/test/test_plugin_email.py b/test/test_plugin_email.py index 59b4d384..6142e9d8 100644 --- a/test/test_plugin_email.py +++ b/test/test_plugin_email.py @@ -29,6 +29,7 @@ import logging import pytest import os +import sys import re from unittest import mock from inspect import cleandoc @@ -40,6 +41,7 @@ from apprise import NotifyType, NotifyBase from apprise import Apprise from apprise import AttachBase from apprise import AppriseAsset +from apprise import PersistentStoreMode from apprise.config import ConfigBase from apprise import AppriseAttachment from apprise.plugins.email import NotifyEmail @@ -48,7 +50,6 @@ from apprise.plugins import email as NotifyEmailModule # Disable logging for a cleaner testing output logging.disable(logging.CRITICAL) - # Attachment Directory TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var') @@ -130,6 +131,10 @@ TEST_URLS = ( ('mailtos://%20@domain.com?user=admin@mail-domain.com', { 'instance': NotifyEmail, }), + ('mailtos://%20@domain.com?user=admin@mail-domain.com?pgp=yes', { + # Test pgp flag + 'instance': NotifyEmail, + }), ('mailtos://user:pass@nuxref.com:567/l2g@nuxref.com', { 'instance': NotifyEmail, }), @@ -2049,3 +2054,72 @@ def test_plugin_email_by_ipaddr_1113(mock_smtp, mock_smtp_ssl): assert email.smtp_host == '10.0.0.195' assert email.port == 25 assert email.targets == [(False, 'alerts@example.com')] + + +@pytest.mark.skipif('pgpy' not in sys.modules, reason="Requires PGPy") +@mock.patch('smtplib.SMTP_SSL') +@mock.patch('smtplib.SMTP') +def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): + """ + NotifyEmail() PGP Tests + + """ + + # Initialize our email (no from name) + obj = Apprise.instantiate('mailto://user:pass@nuxref.com?pgp=yes') + + # Test our names + fnames = obj.pgp_fnames + assert isinstance(fnames, list) + + # login is pgp + obj = Apprise.instantiate('mailto://pgp:pass@nuxref.com?pgp=yes') + + # Test our names + fnames = obj.pgp_fnames + assert isinstance(fnames, list) + + # login is pgp + obj = Apprise.instantiate('mailto://chris:pass@nuxref.com?pgp=yes') + fnames = obj.pgp_fnames + assert isinstance(fnames, list) + + # Attempt to generate keys + obj = Apprise.instantiate('mailto://chris:pass@nuxref.com?pgp=yes') + # We're in memory mode + assert obj.store.mode == PersistentStoreMode.MEMORY + assert obj.pgp_generate_keys() is False + tmpdir1 = tmpdir.mkdir('tmp01') + # However explicitly setting a path works + assert obj.pgp_generate_keys(str(tmpdir1)) is True + + tmpdir2 = tmpdir.mkdir('tmp02') + asset = AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir2), + ) + obj = Apprise.instantiate( + 'mailto://chris:pass@nuxref.com?pgp=yes', asset=asset) + + assert obj.store.mode == PersistentStoreMode.FLUSH + assert obj.pgp_generate_keys() is True + + # We do this again but even when we do a requisition for a public key + # it will generate a new pair or keys for us once it detects we don't + # have any + tmpdir3 = tmpdir.mkdir('tmp03') + asset = AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir3), + ) + obj = Apprise.instantiate( + 'mailto://chris:pass@nuxref.com?pgp=yes', asset=asset) + + assert obj.store.mode == PersistentStoreMode.FLUSH + + # We'll have a public key object to encrypt with + assert obj.pgp_public_key() is not None + + encrypted = obj.pgp_encrypt_message("hello world") + assert encrypted.startswith('-----BEGIN PGP MESSAGE-----') + assert encrypted.rstrip().endswith('-----END PGP MESSAGE-----')