Browse Source

Support for PGP Email Support (DeltaChat Compatible) (#1205)

pull/1215/head
Chris Caron 2 months ago committed by GitHub
parent
commit
f656069e4a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 3
      all-plugin-requirements.txt
  2. 361
      apprise/plugins/email.py
  3. 561
      test/test_plugin_email.py
  4. 18
      test/var/pgp/corrupt-pub.asc
  5. 18
      test/var/pgp/valid-pub.asc

3
all-plugin-requirements.txt

@ -11,3 +11,6 @@ gntp
# Provides mqtt:// support
# use v1.x due to https://github.com/eclipse/paho.mqtt.python/issues/814
paho-mqtt < 2.0.0
# Pretty Good Privacy (PGP) Provides mailto:// and deltachat:// support
PGPy

361
apprise/plugins/email.py

@ -27,28 +27,42 @@
# POSSIBILITY OF SUCH DAMAGE.
import dataclasses
import os
import re
import smtplib
import typing as t
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.utils import formataddr, make_msgid
from email.header import Header
from email import charset
import hashlib
from socket import error as SocketError
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from ..apprise_attachment import AppriseAttachment
from .base import NotifyBase
from ..url import PrivacyMode
from ..common import NotifyFormat, NotifyType
from ..common import NotifyFormat, NotifyType, PersistentStoreMode
from ..conversion import convert_between
from ..utils import is_ipaddr, is_email, parse_emails, is_hostname
from ..utils import is_ipaddr, is_email, parse_emails, is_hostname, parse_bool
from ..locale import gettext_lazy as _
from ..logger import logger
try:
import pgpy
# Pretty Good Privacy (PGP) Support enabled
PGP_SUPPORT = True
except ImportError:
# Pretty Good Privacy (PGP) Support disabled
PGP_SUPPORT = False
# Globally Default encoding mode set to Quoted Printable.
charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8')
@ -355,6 +369,10 @@ class NotifyEmail(NotifyBase):
# Support attachments
attachment_support = True
# There is no reason a PGP Public Key should exceed 8K in size
# If it is more than this, then it is not accepted
max_pgp_public_key_size = 8000
# Default Notify Format
notify_format = NotifyFormat.HTML
@ -439,6 +457,12 @@ class NotifyEmail(NotifyBase):
'type': 'string',
'map_to': 'smtp_host',
},
'pgp': {
'name': _('PGP Encryption'),
'type': 'bool',
'map_to': 'use_pgp',
'default': False,
},
'mode': {
'name': _('Secure Mode'),
'type': 'choice:string',
@ -451,6 +475,14 @@ class NotifyEmail(NotifyBase):
'type': 'list:string',
'map_to': 'reply_to',
},
'pgpkey': {
'name': _('PGP Public Key Path'),
'type': 'string',
'private': True,
# By default persistent storage is referenced
'default': '',
'map_to': 'pgp_key',
},
})
# Define any kwargs we're using
@ -463,7 +495,7 @@ class NotifyEmail(NotifyBase):
def __init__(self, smtp_host=None, from_addr=None, secure_mode=None,
targets=None, cc=None, bcc=None, reply_to=None, headers=None,
**kwargs):
use_pgp=None, pgp_key=None, **kwargs):
"""
Initialize Email Object
@ -500,6 +532,29 @@ class NotifyEmail(NotifyBase):
self.smtp_host = \
smtp_host if isinstance(smtp_host, str) else ''
# pgp hash
self.pgp_public_keys = {}
self.use_pgp = use_pgp if not None \
else self.template_args['pgp']['default']
if self.use_pgp and not PGP_SUPPORT:
self.logger.warning(
'PGP Support is not available on this installation; '
'ask admin to install PGPy')
# Our template object is just an AppriseAttachment object
if pgp_key:
self.pgp_key = AppriseAttachment(asset=self.asset)
# Add our definition to our pgp_key reference
self.pgp_key.add(pgp_key)
# Enforce maximum file size
self.pgp_key[0].max_file_size = self.max_pgp_public_key_size
else:
# No key; use auto-generation
self.pgp_key = None
# Now detect secure mode
if secure_mode:
self.secure_mode = None \
@ -733,6 +788,10 @@ class NotifyEmail(NotifyBase):
'There are no Email recipients to notify')
return False
elif self.use_pgp and not PGP_SUPPORT:
self.logger.warning('PGP Support unavailable')
return False
messages: t.List[EmailMessage] = []
# Create a copy of the targets list
@ -831,6 +890,40 @@ class NotifyEmail(NotifyBase):
mixed.attach(app)
base = mixed
if self.use_pgp:
self.logger.debug("Securing email with PGP Encryption")
# Set our header information to include in the encryption
base['From'] = formataddr(
(None, self.from_addr[1]), charset='utf-8')
base['To'] = formataddr((None, to_addr), charset='utf-8')
base['Subject'] = Header(title, self._get_charset(title))
# Apply our encryption
encrypted_content = self.pgp_encrypt_message(base.as_string())
if not encrypted_content:
self.logger.warning('Unable to PGP encrypt email')
# Unable to send notification
return False
# prepare our messsage
base = MIMEMultipart(
"encrypted", protocol="application/pgp-encrypted")
# Store Autocrypt header (DeltaChat Support)
base.add_header(
"Autocrypt",
"addr=%s; prefer-encrypt=mutual" % formataddr(
(False, to_addr), charset='utf-8'))
# Set Encryption Info Part
enc_payload = MIMEText("Version: 1", "plain")
enc_payload.set_type("application/pgp-encrypted")
base.attach(enc_payload)
enc_payload = MIMEBase("application", "octet-stream")
enc_payload.set_payload(encrypted_content)
base.attach(enc_payload)
# Apply any provided custom headers
for k, v in self.headers.items():
base[k] = Header(v, self._get_charset(v))
@ -901,38 +994,269 @@ class NotifyEmail(NotifyBase):
message.to_addrs,
message.body)
self.logger.info(
f'Sent Email notification to "{message.recipient}".')
self.logger.info('Sent Email to %s', message.recipient)
except (SocketError, smtplib.SMTPException, RuntimeError) as e:
self.logger.warning(
f'Sending email to "{message.recipient}" failed. '
f'Reason: {e}')
'Sending email to "%s" failed.', message.recipient)
self.logger.debug(f'Socket Exception: {e}')
# Mark as failure
has_error = True
except (SocketError, smtplib.SMTPException, RuntimeError) as e:
self.logger.warning(
f'Connection error while submitting email to {self.smtp_host}.'
f' Reason: {e}')
'Connection error while submitting email to "%s"',
self.smtp_host)
self.logger.debug(f'Socket Exception: {e}')
# Mark as failure
has_error = True
finally:
# Gracefully terminate the connection with the server
if socket is not None: # pragma: no branch
if socket is not None:
socket.quit()
# Reduce our dictionary (eliminate expired keys if any)
self.pgp_public_keys = {
key: value for key, value in self.pgp_public_keys.items()
if value['expires'] > datetime.now(timezone.utc)}
return not has_error
def pgp_generate_keys(self, path=None):
"""
Generates a set of keys based on email configured
"""
if path is None:
if self.store.mode == PersistentStoreMode.MEMORY:
# Not possible - no write permissions
return False
# Set our path
path = self.store.path
try:
# Create a new RSA key pair with 2048-bit strength
key = pgpy.PGPKey.new(
pgpy.constants.PubKeyAlgorithm.RSAEncryptOrSign, 2048)
except NameError:
# PGPy not installed
self.logger.debug('PGPy not installed; ignoring PGP file: %s')
return False
# Prepare our uid
name, email = self.names[self.from_addr[1]], self.from_addr[1]
uid = pgpy.PGPUID.new(name, email=email)
# Filenames
file_prefix = email.split('@')[0].lower()
pub_path = os.path.join(path, f'{file_prefix}-pub.asc')
prv_path = os.path.join(path, f'{file_prefix}-prv.asc')
if os.path.isfile(pub_path):
self.logger.warning(
'PGP generation aborted; Public key already exists: %s',
pub_path)
return True
# Add the user ID to the key
key.add_uid(uid, usage={
pgpy.constants.KeyFlags.Sign,
pgpy.constants.KeyFlags.EncryptCommunications},
hashes=[pgpy.constants.HashAlgorithm.SHA256],
ciphers=[pgpy.constants.SymmetricKeyAlgorithm.AES256],
compression=[pgpy.constants.CompressionAlgorithm.ZLIB])
try:
# Write our keys to disk
with open(pub_path, 'w') as f:
f.write(str(key.pubkey))
except OSError as e:
self.logger.warning('Error writing PGP file %s', pub_path)
self.logger.debug(f'I/O Exception: {e}')
# Cleanup
try:
os.unlink(pub_path)
self.logger.trace('Removed %s', pub_path)
except OSError:
pass
try:
with open(prv_path, 'w') as f:
f.write(str(key))
except OSError as e:
self.logger.warning('Error writing PGP file %s', prv_path)
self.logger.debug(f'I/O Exception: {e}')
try:
os.unlink(pub_path)
self.logger.trace('Removed %s', pub_path)
except OSError:
pass
try:
os.unlink(prv_path)
self.logger.trace('Removed %s', prv_path)
except OSError:
pass
return False
self.logger.info(
'Wrote PGP Keys for %s/%s',
os.path.dirname(pub_path),
os.path.basename(pub_path))
return True
def pgp_pubkey(self, email=None):
"""
Returns a list of filenames worth scanning for
"""
if self.pgp_key is not None:
# If our code reaches here, then we fetch our public key
pgp_key = self.pgp_key[0]
if not pgp_key:
# We could not access the attachment
self.logger.error(
'Could not access PGP Public Key {}.'.format(
pgp_key.url(privacy=True)))
return False
return pgp_key.path
elif not self.store.path:
# No path
return None
fnames = [
'pgp-public.asc',
'pgp-pub.asc',
'public.asc',
'pub.asc',
]
# Prepare our key files
emails = [self.from_addr[1]]
if email:
emails.append(email)
for email in emails:
_entry = email.split('@')[0].lower()
fnames.insert(0, f'{_entry}-pub.asc')
# Lowercase email (Highest Priority)
_entry = email.lower()
fnames.insert(0, f'{_entry}-pub.asc')
return next(
(os.path.join(self.store.path, fname)
for fname in fnames
if os.path.isfile(os.path.join(self.store.path, fname))),
None)
def pgp_public_key(self, path=None, email=None):
"""
Opens a spcified pgp public file and returns the key from it which
is used to encrypt the message
"""
if path is None:
path = self.pgp_pubkey(email=email)
if not path:
if self.pgp_generate_keys(path=self.store.path):
path = self.pgp_pubkey(email=email)
if path:
# We should get a hit now
return self.pgp_public_key(path=path)
self.logger.warning('No PGP Public Key could be loaded')
return None
# Persistent storage key
ps_key = hashlib.sha1(
os.path.abspath(path).encode('utf-8')).hexdigest()
if ps_key in self.pgp_public_keys:
# Take an early exit
return self.pgp_public_keys[ps_key]['public_key']
try:
with open(path, 'r') as key_file:
public_key, _ = pgpy.PGPKey.from_blob(key_file.read())
except NameError:
# PGPy not installed
self.logger.debug(
'PGPy not installed; skipping PGP support: %s', path)
return None
except FileNotFoundError:
# Generate keys
self.logger.debug('PGP Public Key file not found: %s', path)
return None
except OSError as e:
self.logger.warning('Error accessing PGP Public Key file %s', path)
self.logger.debug(f'I/O Exception: {e}')
return None
self.pgp_public_keys[ps_key] = {
'public_key': public_key,
'expires':
datetime.now(timezone.utc) + timedelta(seconds=86400)
}
return public_key
# Encrypt message using the recipient's public key
def pgp_encrypt_message(self, message, path=None):
"""
If provided a path to a pgp-key, content is encrypted
"""
# Acquire our key
public_key = self.pgp_public_key(path=path)
if not public_key:
# Encryption not possible
return False
try:
message_object = pgpy.PGPMessage.new(message)
encrypted_message = public_key.encrypt(message_object)
return str(encrypted_message)
except pgpy.errors.PGPError:
# Encryption not Possible
self.logger.debug(
'PGP Public Key Corruption; encryption not possible')
except NameError:
# PGPy not installed
self.logger.debug('PGPy not installed; Skipping PGP encryption')
return None
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define an URL parameters
params = {}
params = {
'pgp': 'yes' if self.use_pgp else 'no',
}
# Store oure public key back into your URL
if self.pgp_key is not None:
params['pgp_key'] = NotifyEmail.quote(
self.pgp_key[0].url(privacy=privacy), safe=':')
# Append our headers into our parameters
params.update({'+{}'.format(k): v for k, v in self.headers.items()})
@ -1044,7 +1368,7 @@ class NotifyEmail(NotifyBase):
"""
return (
self.secure_protocol if self.secure else self.protocol,
self.user, self.password, self.host,
self.user, self.password, self.host, self.smtp_host,
self.port if self.port
else SECURE_MODES[self.secure_mode]['default_port'],
)
@ -1053,8 +1377,7 @@ class NotifyEmail(NotifyBase):
"""
Returns the number of targets associated with this notification
"""
targets = len(self.targets)
return targets if targets > 0 else 1
return len(self.targets) if self.targets else 1
@staticmethod
def parse_url(url):
@ -1086,6 +1409,16 @@ class NotifyEmail(NotifyBase):
# value if invalid; we'll attempt to figure this out later on
results['host'] = ''
# Get PGP Flag
results['use_pgp'] = \
parse_bool(results['qsd'].get(
'pgp', NotifyEmail.template_args['pgp']['default']))
# Get PGP Public Key Override
if 'pgpkey' in results['qsd'] and results['qsd']['pgpkey']:
results['pgp_key'] = \
NotifyEmail.unquote(results['qsd']['pgpkey'])
# The From address is a must; either through the use of templates
# from= entry and/or merging the user and hostname together, this
# must be calculated or parse_url will fail.

561
test/test_plugin_email.py

File diff suppressed because it is too large Load Diff

18
test/var/pgp/corrupt-pub.asc

@ -0,0 +1,18 @@
-----BEGIN PGP PUBLIC KEY BLOCK-----
xsBNBGbo3ycBCACjECe63GpZanFYLE678OIhzpvY0J6pCBCZblGxXOuwuv7VPIq7
XQN91UdOL8huDb/JYRxarDoS6+JvyempbzZt0S+veXDl4pRFb+Q4a5sp1l/Mduw0
rDFErwfF8SpMPBI2WJUhN9n72UEqXVDvTPdcAka8v8p7dS6/RKZzch8P+EjgJME0
p9+N/2Lrbi2nDDXD+xD4Odw83J5V8Xn/jie3GCxtXda5XIX3EzTSB30elLLNBMcq
ooooooooooooooooooooooooooooo6bRS9TT34rZZk7qyB2iroq9SdIBCGyn1q4r
uIskVNCsqgP9cMa+S1XePUT77VNNN1yzhACpABEBAAHNIUNocmlzIENhcm9uIDxs
ZWFkMmdvbGRAZ21haWwuooooooooooooooooooooooooooooogILCQIVCAIWAgIe
ARYhBEHHWtq4Kh8dGraFnkmZAD9B29oPAAoJEEmZAD9B29oPAawIAImCijTdvDl8
Sibwo7gL4ooooooooooooooooooooooooooooofjiEEW8gVQ4W2KDs74aCGkQtQJ
irvNA7WnuyMyXZyvhYa63U7GTk5RdVkMygT0a5n8/8HVAenZrBL6VNaZYw/LlgWd
0knhsmqdGTsjKuYdZ3Cooooooooooooooooooooooooooooo2GWBnvOQje+lQGIf
rE6TIwsf4QoKXSkTakzggbpZZl2hg2O6dJiij1cH+DYFVTaVXw4rVmo8ckTJ9DiF
T9H/EmsNqlSKTTv1Aw4raCFZ+T/Ocsw/vIOoEtVhiT/mfDcIbi0VB3EhYvI3eFso
yiZsjyu9xY0=
=ZY2q
-----END PGP PUBLIC KEY BLOCK-----

18
test/var/pgp/valid-pub.asc

@ -0,0 +1,18 @@
-----BEGIN PGP PUBLIC KEY BLOCK-----
xsBNBGbo3ycBCACjECe63GpZanFYLE678OIhzpvY0J6pCBCZblGxXOuwuv7VPIq7
XQN91UdOL8huDb/JYRxarDoS6+JvyempbzZt0S+veXDl4pRFb+Q4a5sp1l/Mduw0
rDFErwfF8SpMPBI2WJUhN9n72UEqXVDvTPdcAka8v8p7dS6/RKZzch8P+EjgJME0
p9+N/2Lrbi2nDDXD+xD4Odw83J5V8Xn/jie3GCxtXda5XIX3EzTSB30elLLNBMcq
N5xJBTrjhciDzU85Gb+bUecnoj9Oj6bRS9TT34rZZk7qyB2iroq9SdIBCGyn1q4r
uIskVNCsqgP9cMa+S1XePUT77VNNN1yzhACpABEBAAHNIUNocmlzIENhcm9uIDxs
ZWFkMmdvbGRAZ21haWwuY29tPsLAggQTAQgALAUCZujfJwIbBgILCQIVCAIWAgIe
ARYhBEHHWtq4Kh8dGraFnkmZAD9B29oPAAoJEEmZAD9B29oPAawIAImCijTdvDl8
Sibwo7gL4ayF4S3KhaKCYORcMM1oe4pesy5ME6fjiEEW8gVQ4W2KDs74aCGkQtQJ
irvNA7WnuyMyXZyvhYa63U7GTk5RdVkMygT0a5n8/8HVAenZrBL6VNaZYw/LlgWd
0knhsmqdGTsjKuYdZ3CHED85pv/MOwe0pyGOQKtJ1t9qwc6l2GWBnvOQje+lQGIf
rE6TIwsf4QoKXSkTakzggbpZZl2hg2O6dJiij1cH+DYFVTaVXw4rVmo8ckTJ9DiF
T9H/EmsNqlSKTTv1Aw4raCFZ+T/Ocsw/vIOoEtVhiT/mfDcIbi0VB3EhYvI3eFso
yiZsjyu9xY0=
=ZY2q
-----END PGP PUBLIC KEY BLOCK-----
Loading…
Cancel
Save