mirror of https://github.com/caronc/apprise
PGP Encryption working
parent
1a9d1dddd5
commit
a9ed27ee02
|
@ -34,6 +34,7 @@ import typing as t
|
|||
from email.mime.text import MIMEText
|
||||
from email.mime.application import MIMEApplication
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.base import MIMEBase
|
||||
from email.utils import formataddr, make_msgid
|
||||
from email.header import Header
|
||||
from email import charset
|
||||
|
@ -787,6 +788,10 @@ class NotifyEmail(NotifyBase):
|
|||
'There are no Email recipients to notify')
|
||||
return False
|
||||
|
||||
elif self.use_pgp and not PGP_SUPPORT:
|
||||
self.logger.warning('PGP Support unavailable')
|
||||
return False
|
||||
|
||||
messages: t.List[EmailMessage] = []
|
||||
|
||||
# Create a copy of the targets list
|
||||
|
@ -887,10 +892,33 @@ class NotifyEmail(NotifyBase):
|
|||
|
||||
if self.use_pgp:
|
||||
self.logger.debug("Securing email with PGP Encryption")
|
||||
# Set our header information to include in the encryption
|
||||
base['From'] = formataddr(
|
||||
(None, self.from_addr[1]), charset='utf-8')
|
||||
base['To'] = formataddr((None, to_addr), charset='utf-8')
|
||||
base['Subject'] = Header(title, self._get_charset(title))
|
||||
|
||||
# Apply our encryption
|
||||
encrypted_content = self.pgp_encrypt_message(base.as_string())
|
||||
if encrypted_content:
|
||||
base = MIMEText(encrypted_content, "plain")
|
||||
# prepare our messsage
|
||||
base = MIMEMultipart(
|
||||
"encrypted", protocol="application/pgp-encrypted")
|
||||
|
||||
# Store Autocrypt header (DeltaChat Support)
|
||||
base.add_header(
|
||||
"Autocrypt",
|
||||
"addr=%s; prefer-encrypt=mutual" % formataddr(
|
||||
(False, to_addr), charset='utf-8'))
|
||||
|
||||
# Set Encryption Info Part
|
||||
enc_payload = MIMEText("Version: 1", "plain")
|
||||
enc_payload.set_type("application/pgp-encrypted")
|
||||
base.attach(enc_payload)
|
||||
|
||||
enc_payload = MIMEBase("application", "octet-stream")
|
||||
enc_payload.set_payload(encrypted_content)
|
||||
base.attach(enc_payload)
|
||||
|
||||
# Apply any provided custom headers
|
||||
for k, v in self.headers.items():
|
||||
|
@ -984,7 +1012,13 @@ class NotifyEmail(NotifyBase):
|
|||
finally:
|
||||
# Gracefully terminate the connection with the server
|
||||
if socket is not None: # pragma: no branch
|
||||
socket.quit()
|
||||
try:
|
||||
socket.quit()
|
||||
|
||||
except (SocketError, smtplib.SMTPException):
|
||||
# No need to make this a bigger issue as we were exiting
|
||||
# anyway
|
||||
pass
|
||||
|
||||
# Reduce our dictionary (eliminate expired keys if any)
|
||||
self.pgp_public_keys = {
|
||||
|
@ -1079,8 +1113,7 @@ class NotifyEmail(NotifyBase):
|
|||
os.path.basename(pub_path))
|
||||
return True
|
||||
|
||||
@property
|
||||
def pgp_pubkey(self):
|
||||
def pgp_pubkey(self, email=None):
|
||||
"""
|
||||
Returns a list of filenames worth scanning for
|
||||
"""
|
||||
|
@ -1107,17 +1140,22 @@ class NotifyEmail(NotifyBase):
|
|||
'pub.asc',
|
||||
]
|
||||
|
||||
emails = []
|
||||
if email:
|
||||
emails.append(email)
|
||||
|
||||
# Prepare our key files:
|
||||
email = self.from_addr[1]
|
||||
emails.append(self.from_addr[1])
|
||||
|
||||
_entry = email.split('@')[0].lower()
|
||||
if _entry not in fnames:
|
||||
fnames.insert(0, f'{_entry}-pub.asc')
|
||||
for email in emails:
|
||||
_entry = email.split('@')[0].lower()
|
||||
if _entry not in fnames:
|
||||
fnames.insert(0, f'{_entry}-pub.asc')
|
||||
|
||||
# Lowercase email (Highest Priority)
|
||||
_entry = email.lower()
|
||||
if _entry not in fnames:
|
||||
fnames.insert(0, f'{_entry}-pub.asc')
|
||||
# Lowercase email (Highest Priority)
|
||||
_entry = email.lower()
|
||||
if _entry not in fnames:
|
||||
fnames.insert(0, f'{_entry}-pub.asc')
|
||||
|
||||
return next(
|
||||
(os.path.join(self.store.path, fname)
|
||||
|
@ -1125,17 +1163,17 @@ class NotifyEmail(NotifyBase):
|
|||
if os.path.isfile(os.path.join(self.store.path, fname))),
|
||||
None)
|
||||
|
||||
def pgp_public_key(self, path=None):
|
||||
def pgp_public_key(self, path=None, email=None):
|
||||
"""
|
||||
Opens a spcified pgp public file and returns the key from it which
|
||||
is used to encrypt the message
|
||||
"""
|
||||
if path is None:
|
||||
path = self.pgp_pubkey
|
||||
path = self.pgp_pubkey(email=email)
|
||||
|
||||
if not path:
|
||||
if self.pgp_generate_keys(path=self.store.path):
|
||||
path = self.pgp_pubkey
|
||||
path = self.pgp_pubkey(email=email)
|
||||
if path:
|
||||
# We should get a hit now
|
||||
return self.pgp_public_key(path=path)
|
||||
|
|
|
@ -2069,7 +2069,7 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir):
|
|||
obj = Apprise.instantiate('mailto://user:pass@nuxref.com?pgp=yes')
|
||||
|
||||
# Nothing to lookup
|
||||
assert obj.pgp_pubkey is None
|
||||
assert obj.pgp_pubkey() is None
|
||||
assert obj.pgp_public_key() is None
|
||||
assert obj.pgp_encrypt_message("message") is False
|
||||
# Keys can not be generated in memory mode
|
||||
|
@ -2090,15 +2090,15 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir):
|
|||
assert obj.store.mode == PersistentStoreMode.FLUSH
|
||||
|
||||
# Still no public key
|
||||
assert obj.pgp_pubkey is None
|
||||
assert obj.pgp_pubkey() is None
|
||||
|
||||
assert obj.pgp_generate_keys() is True
|
||||
# Now we'll have a public key
|
||||
assert isinstance(obj.pgp_pubkey, str)
|
||||
assert isinstance(obj.pgp_pubkey(), str)
|
||||
|
||||
# Prepare PGP
|
||||
obj = Apprise.instantiate(
|
||||
f'mailto://pgp:pass@nuxref.com?pgp=yes&pgpkey={obj.pgp_pubkey}',
|
||||
'mailto://pgp:pass@nuxref.com?pgp=yes&pgpkey=%s' % obj.pgp_pubkey(),
|
||||
asset=asset)
|
||||
|
||||
# We will find our key
|
||||
|
|
Loading…
Reference in New Issue