mirror of https://github.com/caronc/apprise
test coverage complete
parent
b8a3b6edfe
commit
800aad7765
|
@ -900,25 +900,29 @@ class NotifyEmail(NotifyBase):
|
|||
|
||||
# Apply our encryption
|
||||
encrypted_content = self.pgp_encrypt_message(base.as_string())
|
||||
if encrypted_content:
|
||||
# prepare our messsage
|
||||
base = MIMEMultipart(
|
||||
"encrypted", protocol="application/pgp-encrypted")
|
||||
if not encrypted_content:
|
||||
self.logger.warning('Unable to PGP encrypt email')
|
||||
# Unable to send notification
|
||||
return False
|
||||
|
||||
# Store Autocrypt header (DeltaChat Support)
|
||||
base.add_header(
|
||||
"Autocrypt",
|
||||
"addr=%s; prefer-encrypt=mutual" % formataddr(
|
||||
(False, to_addr), charset='utf-8'))
|
||||
# prepare our messsage
|
||||
base = MIMEMultipart(
|
||||
"encrypted", protocol="application/pgp-encrypted")
|
||||
|
||||
# Set Encryption Info Part
|
||||
enc_payload = MIMEText("Version: 1", "plain")
|
||||
enc_payload.set_type("application/pgp-encrypted")
|
||||
base.attach(enc_payload)
|
||||
# Store Autocrypt header (DeltaChat Support)
|
||||
base.add_header(
|
||||
"Autocrypt",
|
||||
"addr=%s; prefer-encrypt=mutual" % formataddr(
|
||||
(False, to_addr), charset='utf-8'))
|
||||
|
||||
enc_payload = MIMEBase("application", "octet-stream")
|
||||
enc_payload.set_payload(encrypted_content)
|
||||
base.attach(enc_payload)
|
||||
# Set Encryption Info Part
|
||||
enc_payload = MIMEText("Version: 1", "plain")
|
||||
enc_payload.set_type("application/pgp-encrypted")
|
||||
base.attach(enc_payload)
|
||||
|
||||
enc_payload = MIMEBase("application", "octet-stream")
|
||||
enc_payload.set_payload(encrypted_content)
|
||||
base.attach(enc_payload)
|
||||
|
||||
# Apply any provided custom headers
|
||||
for k, v in self.headers.items():
|
||||
|
@ -1147,13 +1151,11 @@ class NotifyEmail(NotifyBase):
|
|||
|
||||
for email in emails:
|
||||
_entry = email.split('@')[0].lower()
|
||||
if _entry not in fnames:
|
||||
fnames.insert(0, f'{_entry}-pub.asc')
|
||||
fnames.insert(0, f'{_entry}-pub.asc')
|
||||
|
||||
# Lowercase email (Highest Priority)
|
||||
_entry = email.lower()
|
||||
if _entry not in fnames:
|
||||
fnames.insert(0, f'{_entry}-pub.asc')
|
||||
fnames.insert(0, f'{_entry}-pub.asc')
|
||||
|
||||
return next(
|
||||
(os.path.join(self.store.path, fname)
|
||||
|
@ -1179,7 +1181,7 @@ class NotifyEmail(NotifyBase):
|
|||
self.logger.warning('No PGP Public Key could be loaded')
|
||||
return None
|
||||
|
||||
# Persistent storage key:
|
||||
# Persistent storage key
|
||||
ps_key = hashlib.sha1(
|
||||
os.path.abspath(path).encode('utf-8')).hexdigest()
|
||||
if ps_key in self.pgp_public_keys:
|
||||
|
@ -1230,6 +1232,11 @@ class NotifyEmail(NotifyBase):
|
|||
encrypted_message = public_key.encrypt(message_object)
|
||||
return str(encrypted_message)
|
||||
|
||||
except pgpy.errors.PGPError:
|
||||
# Encryption not Possible
|
||||
self.logger.debug(
|
||||
'PGP Public Key Corruption; encryption not possible')
|
||||
|
||||
except NameError:
|
||||
# PGPy not installed
|
||||
self.logger.debug('PGPy not installed; Skipping PGP encryption')
|
||||
|
|
|
@ -2147,6 +2147,12 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir):
|
|||
# However explicitly setting a path works
|
||||
assert obj.pgp_generate_keys(str(tmpdir1)) is True
|
||||
|
||||
# Prepare Invalid PGP Key
|
||||
obj = Apprise.instantiate(
|
||||
'mailto://pgp:pass@nuxref.com?pgp=yes&pgpkey=invalid',
|
||||
asset=asset)
|
||||
assert obj.pgp_pubkey() is False
|
||||
|
||||
tmpdir2 = tmpdir.mkdir('tmp02')
|
||||
asset = AppriseAsset(
|
||||
storage_mode=PersistentStoreMode.FLUSH,
|
||||
|
@ -2204,6 +2210,10 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir):
|
|||
os.path.join(obj.store.path, 'user@example.com-pub.asc'))
|
||||
|
||||
assert obj.pgp_pubkey() is None
|
||||
assert obj.pgp_pubkey(email="not-reference@example.com") is None
|
||||
assert obj.pgp_pubkey(email="user@example.com")\
|
||||
.endswith('user@example.com-pub.asc')
|
||||
|
||||
assert obj.pgp_pubkey(email="user@example.com")\
|
||||
.endswith('user@example.com-pub.asc')
|
||||
assert obj.pgp_pubkey(email="User@Example.com")\
|
||||
|
@ -2214,6 +2224,7 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir):
|
|||
os.path.join(obj.store.path, 'user@example.com-pub.asc'),
|
||||
os.path.join(obj.store.path, 'user-pub.asc'),
|
||||
)
|
||||
|
||||
assert obj.pgp_pubkey(email="user@example.com")\
|
||||
.endswith('user@example.com-pub.asc')
|
||||
assert obj.pgp_pubkey(email="User@Example.com")\
|
||||
|
@ -2255,13 +2266,20 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir):
|
|||
obj = Apprise.instantiate(
|
||||
'mailto://chris:pass@nuxref.com/user@example.com?pgp=yes', asset=asset)
|
||||
|
||||
with mock.patch('builtins.open', side_effect=OSError()):
|
||||
with mock.patch('builtins.open', side_effect=FileNotFoundError):
|
||||
# can't open key
|
||||
assert obj.pgp_public_key(path=obj.store.path) is None
|
||||
|
||||
with mock.patch('builtins.open', side_effect=OSError):
|
||||
# can't open key
|
||||
assert obj.pgp_public_key(path=obj.store.path) is None
|
||||
# Test unlink
|
||||
with mock.patch('os.unlink', side_effect=OSError()):
|
||||
with mock.patch('os.unlink', side_effect=OSError):
|
||||
assert obj.pgp_public_key(path=obj.store.path) is None
|
||||
|
||||
# Key Generation will fail
|
||||
assert obj.pgp_generate_keys() is False
|
||||
|
||||
with mock.patch('pgpy.PGPKey.new', side_effect=NameError):
|
||||
# Can't Generate keys
|
||||
assert obj.pgp_generate_keys() is False
|
||||
|
@ -2282,3 +2300,67 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir):
|
|||
|
||||
with mock.patch('pgpy.PGPMessage.new', side_effect=NameError):
|
||||
assert obj.pgp_encrypt_message("message") is None
|
||||
# Attempts to encrypt a message
|
||||
assert obj.notify('test') is False
|
||||
|
||||
# Create new keys
|
||||
assert obj.pgp_generate_keys() is True
|
||||
with mock.patch('os.path.isfile', return_value=False):
|
||||
with mock.patch('builtins.open', side_effect=OSError):
|
||||
with mock.patch('os.unlink', return_value=None):
|
||||
assert obj.pgp_generate_keys() is False
|
||||
|
||||
# Testing again
|
||||
tmpdir5 = tmpdir.mkdir('tmp05')
|
||||
asset = AppriseAsset(
|
||||
storage_mode=PersistentStoreMode.FLUSH,
|
||||
storage_path=str(tmpdir5),
|
||||
)
|
||||
obj = Apprise.instantiate(
|
||||
'mailto://chris:pass@nuxref.com/user@example.com?pgp=yes', asset=asset)
|
||||
|
||||
# Catch edge case where we just can't generate the the key
|
||||
with mock.patch('os.path.isfile', side_effect=(
|
||||
# 5x False to skip through pgp_pubkey()
|
||||
False, False, False, False, False, False,
|
||||
# 1x True to pass pgp_generate_keys()
|
||||
True,
|
||||
# 5x False to skip through pgp_pubkey() second call
|
||||
False, False, False, False, False, False)):
|
||||
with mock.patch('pgpy.PGPKey.from_blob',
|
||||
side_effect=FileNotFoundError):
|
||||
assert obj.pgp_public_key() is None
|
||||
|
||||
# Corrupt Data
|
||||
tmpdir6 = tmpdir.mkdir('tmp06')
|
||||
asset = AppriseAsset(
|
||||
storage_mode=PersistentStoreMode.FLUSH,
|
||||
storage_path=str(tmpdir6),
|
||||
)
|
||||
obj = Apprise.instantiate(
|
||||
'mailto://chris:pass@nuxref.com/user@example.com?pgp=yes', asset=asset)
|
||||
|
||||
shutil.copyfile(
|
||||
os.path.join(TEST_VAR_DIR, 'pgp', 'corrupt-pub.asc'),
|
||||
os.path.join(obj.store.path, 'chris-pub.asc'),
|
||||
)
|
||||
|
||||
# Key is corrupted
|
||||
obj.notify('test') is False
|
||||
|
||||
shutil.copyfile(
|
||||
os.path.join(TEST_VAR_DIR, 'apprise-test.jpeg'),
|
||||
os.path.join(obj.store.path, 'chris-pub.asc'),
|
||||
)
|
||||
|
||||
# Key is a binary image; definitely not a valid key
|
||||
obj.notify('test') is False
|
||||
|
||||
# Using a public key
|
||||
shutil.copyfile(
|
||||
os.path.join(TEST_VAR_DIR, 'pgp', 'valid-pub.asc'),
|
||||
os.path.join(obj.store.path, 'chris-pub.asc'),
|
||||
)
|
||||
|
||||
# Notification goes through
|
||||
obj.notify('test') is True
|
||||
|
|
|
@ -1,31 +0,0 @@
|
|||
-----BEGIN PGP PRIVATE KEY BLOCK-----
|
||||
|
||||
xcLYBGbo3ycBCACjECe63GpZanFYLE678OIhzpvY0J6pCBCZblGxXOuwuv7VPIq7
|
||||
XQN91UdOL8huDb/JYRxarDoS6+JvyempbzZt0S+veXDl4pRFb+Q4a5sp1l/Mduw0
|
||||
rDFErwfF8SpMPBI2WJUhN9n72UEqXVDvTPdcAka8v8p7dS6/RKZzch8P+EjgJME0
|
||||
p9+N/2Lrbi2nDDXD+xD4Odw83J5V8Xn/jie3GCxtXda5XIX3EzTSB30elLLNBMcq
|
||||
N5xJBTrjhciDzU85Gb+bUecnoj9Oj6bRS9TT34rZZk7qyB2iroq9SdIBCGyn1q4r
|
||||
uIskVNCsqgP9cMa+S1XePUT77VNNN1yzhACpABEBAAEAB/4tOpPqjqyo9I9Px6pn
|
||||
Et+GRQqRTvxTIjuIc0MRkRaGxLdeahaI9bm8M2Y9158ed43Uy6zTsaXCDc+W9khr
|
||||
iL9uInG5mFOqT/iUcf65b49wQVf9HJdT3Ncll+7uBoCW+KqMjHGA7z71TkN2/r8u
|
||||
QQjzamY4gHInYE+BGgeZSfQ3t1MJMSdjQopPGDwSsco5hQJYtVH8K/0/Ig5S5E9Q
|
||||
KD9ku3W9bCliERkljIwEbbyDv/vmbxPKdWW83T+UQK6CQhkH6h69EoMGQ76a6y/H
|
||||
UuppNSpxuR4BiX4ZlcUyARrLluaRS0K1/OZCoScA0LLjY8pCEBoWT0uhhvy3t2xD
|
||||
/bipBADS3bM3yGGZKtNgLPQx0BAyWk07OD3AlObykz4yTIc9DZj1bzhHKgAhwUAN
|
||||
k7StwA22HoxMCKSoxhherZaXAQaJJOJKNXw3DphHCexrBq77nxBu3yo9UStj04Lx
|
||||
tCEibclsQcwgh7TjjjDQdRYiirZvu9IGQBf27xKvTepibn7NnwQAxfcePfXsHza3
|
||||
7CuJbxOGFaPf4ENSpFRYSZbH3dErtSlGDzz8e8jI0Ck9LQgp9MfjksWUwaMQbXdV
|
||||
zNbQe1lAWQxtN9amVvEWvrAJhbhEU6RLsSjpZ9W5r3xAbfkoDg/icjbdoOqwI6LE
|
||||
aTEhwaz+XZMLYJiT22AMJyC7TcL6fLcD/0nBRheQBqTsuYKimKI5yZ3ZdlGHfaLN
|
||||
OqMGfuaEQCUAhSaXNliuP3XAWfiVXCaRw9De+Eod6DfGMGTTx8EVy7N4y4w3TORp
|
||||
fFKaMGD3oiw1Eh63K1jV2yWPPpOnyc+YtXCPuGS+n/3CITc5cKxaPapQtCJA9Gw0
|
||||
OaZ7ikUNs0Q9NbfNIUNocmlzIENhcm9uIDxsZWFkMmdvbGRAZ21haWwuY29tPsLA
|
||||
ggQTAQgALAUCZujfJwIbBgILCQIVCAIWAgIeARYhBEHHWtq4Kh8dGraFnkmZAD9B
|
||||
29oPAAoJEEmZAD9B29oPAawIAImCijTdvDl8Sibwo7gL4ayF4S3KhaKCYORcMM1o
|
||||
e4pesy5ME6fjiEEW8gVQ4W2KDs74aCGkQtQJirvNA7WnuyMyXZyvhYa63U7GTk5R
|
||||
dVkMygT0a5n8/8HVAenZrBL6VNaZYw/LlgWd0knhsmqdGTsjKuYdZ3CHED85pv/M
|
||||
Owe0pyGOQKtJ1t9qwc6l2GWBnvOQje+lQGIfrE6TIwsf4QoKXSkTakzggbpZZl2h
|
||||
g2O6dJiij1cH+DYFVTaVXw4rVmo8ckTJ9DiFT9H/EmsNqlSKTTv1Aw4raCFZ+T/O
|
||||
csw/vIOoEtVhiT/mfDcIbi0VB3EhYvI3eFsoyiZsjyu9xY0=
|
||||
=dBp6
|
||||
-----END PGP PRIVATE KEY BLOCK-----
|
Loading…
Reference in New Issue