Refactored base64 attachment handling

pull/1191/head
Chris Caron 2024-08-27 17:51:08 -04:00
parent ca50cb7820
commit 7e12e1d54a
13 changed files with 216 additions and 134 deletions

View File

@ -291,7 +291,7 @@ class AttachBase(URLBase):
return False if not retrieve_if_missing else self.download()
def base64(self, encoding='utf-8'):
def base64(self, encoding='ascii'):
"""
Returns the attachment object as a base64 string otherwise
None is returned if an error occurs.
@ -306,7 +306,7 @@ class AttachBase(URLBase):
raise exception.AppriseFileNotFound("Attachment Missing")
try:
with open(self.path, 'rb') as f:
with self.open() as f:
# Prepare our Attachment in Base64
return base64.b64encode(f.read()).decode(encoding) \
if encoding else base64.b64encode(f.read())

View File

@ -29,7 +29,9 @@
import re
import os
import io
import base64
from .base import AttachBase
from .. import exception
from ..common import ContentLocation
from ..locale import gettext_lazy as _
import uuid
@ -145,6 +147,23 @@ class AttachMemory(AttachBase):
return True
def base64(self, encoding='ascii'):
"""
We need to over-ride this since the base64 sub-library seems to close
our file descriptor making it no longer referencable.
"""
if not self:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
self.url(privacy=True)))
raise exception.AppriseFileNotFound("Attachment Missing")
self._data.seek(0, 0)
return base64.b64encode(self._data.read()).decode(encoding) \
if encoding else base64.b64encode(self._data.read())
def invalidate(self):
"""
Removes data

View File

@ -29,8 +29,8 @@
import re
import requests
from json import dumps
import base64
from .. import exception
from .base import NotifyBase
from ..url import PrivacyMode
from ..common import NotifyType
@ -261,21 +261,20 @@ class NotifyAppriseAPI(NotifyBase):
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
'Could not access Apprise API attachment {}.'.format(
attachment.url(privacy=True)))
return False
try:
if self.method == AppriseAPIMethod.JSON:
with open(attachment.path, 'rb') as f:
# Output must be in a DataURL format (that's what
# PushSafer calls it):
attachments.append({
'filename': attachment.name,
'base64': base64.b64encode(f.read())
.decode('utf-8'),
'mimetype': attachment.mimetype,
})
# Output must be in a DataURL format (that's what
# PushSafer calls it):
attachments.append({
"filename": attachment.name
if attachment.name else f'file{no:03}.dat',
'base64': attachment.base64(),
'mimetype': attachment.mimetype,
})
else: # AppriseAPIMethod.FORM
files.append((
@ -287,13 +286,17 @@ class NotifyAppriseAPI(NotifyBase):
)
))
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occurred while reading {}.'.format(
attachment.name if attachment else 'attachment'))
self.logger.debug('I/O Exception: %s' % str(e))
except (TypeError, OSError, exception.AppriseException):
# We could not access the attachment
self.logger.error(
'Could not access AppriseAPI attachment {}.'.format(
attachment.url(privacy=True)))
return False
self.logger.debug(
'Appending AppriseAPI attachment {}'.format(
attachment.url(privacy=True)))
# prepare Apprise API Object
payload = {
# Apprise API Payload

View File

@ -27,9 +27,9 @@
# POSSIBILITY OF SUCH DAMAGE.
import requests
import base64
from json import dumps
from .. import exception
from .base import NotifyBase
from ..url import PrivacyMode
from ..common import NotifyImageSize
@ -213,33 +213,34 @@ class NotifyJSON(NotifyBase):
# Track our potential attachments
attachments = []
if attach and self.attachment_support:
for attachment in attach:
for no, attachment in enumerate(attach):
# Perform some simple error checking
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
'Could not access Custom JSON attachment {}.'.format(
attachment.url(privacy=True)))
return False
try:
with open(attachment.path, 'rb') as f:
# Output must be in a DataURL format (that's what
# PushSafer calls it):
attachments.append({
'filename': attachment.name,
'base64': base64.b64encode(f.read())
.decode('utf-8'),
'mimetype': attachment.mimetype,
})
attachments.append({
"filename": attachment.name
if attachment.name else f'file{no:03}.dat',
'base64': attachment.base64(),
'mimetype': attachment.mimetype,
})
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occurred while reading {}.'.format(
attachment.name if attachment else 'attachment'))
self.logger.debug('I/O Exception: %s' % str(e))
except exception.AppriseException:
# We could not access the attachment
self.logger.error(
'Could not access Custom JSON attachment {}.'.format(
attachment.url(privacy=True)))
return False
self.logger.debug(
'Appending Custom JSON attachment {}'.format(
attachment.url(privacy=True)))
# Prepare JSON Object
payload = {
JSONPayloadField.VERSION: self.json_version,

View File

@ -28,8 +28,8 @@
import re
import requests
import base64
from .. import exception
from .base import NotifyBase
from ..url import PrivacyMode
from ..common import NotifyImageSize
@ -287,35 +287,39 @@ class NotifyXML(NotifyBase):
attachments = []
if attach and self.attachment_support:
for attachment in attach:
for no, attachment in enumerate(attach, start=1):
# Perform some simple error checking
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
'Could not access Custom XML attachment {}.'.format(
attachment.url(privacy=True)))
return False
try:
with open(attachment.path, 'rb') as f:
# Prepare our Attachment in Base64
entry = \
'<Attachment filename="{}" mimetype="{}">'.format(
NotifyXML.escape_html(
attachment.name, whitespace=False),
NotifyXML.escape_html(
attachment.mimetype, whitespace=False))
entry += base64.b64encode(f.read()).decode('utf-8')
entry += '</Attachment>'
attachments.append(entry)
# Prepare our Attachment in Base64
entry = \
'<Attachment filename="{}" mimetype="{}">'.format(
NotifyXML.escape_html(
attachment.name if attachment.name
else f'file{no:03}.dat', whitespace=False),
NotifyXML.escape_html(
attachment.mimetype, whitespace=False))
entry += attachment.base64()
entry += '</Attachment>'
attachments.append(entry)
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occurred while reading {}.'.format(
attachment.name if attachment else 'attachment'))
self.logger.debug('I/O Exception: %s' % str(e))
except exception.AppriseException:
# We could not access the attachment
self.logger.error(
'Could not access Custom XML attachment {}.'.format(
attachment.url(privacy=True)))
return False
self.logger.debug(
'Appending Custom XML attachment {}'.format(
attachment.url(privacy=True)))
# Update our xml_attachments record:
xml_attachments = \
'<Attachments format="base64">' + \

View File

@ -26,11 +26,11 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import base64
import requests
from json import loads
from .base import NotifyBase
from .. import exception
from ..common import NotifyType
from ..utils import parse_list
from ..utils import validate_regex
@ -553,7 +553,7 @@ class NotifyPushSafer(NotifyBase):
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
'Could not access PushSafer attachment {}.'.format(
attachment.url(privacy=True)))
return False
@ -569,24 +569,26 @@ class NotifyPushSafer(NotifyBase):
attachment.url(privacy=True)))
try:
with open(attachment.path, 'rb') as f:
# Output must be in a DataURL format (that's what
# PushSafer calls it):
attachment = (
attachment.name,
'data:{};base64,{}'.format(
attachment.mimetype,
base64.b64encode(f.read())))
# Output must be in a DataURL format (that's what
# PushSafer calls it):
attachments.append((
attachment.name,
'data:{};base64,{}'.format(
attachment.mimetype,
attachment.base64,
)
))
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occurred while reading {}.'.format(
attachment.name if attachment else 'attachment'))
self.logger.debug('I/O Exception: %s' % str(e))
except exception.AppriseException:
# We could not access the attachment
self.logger.error(
'Could not access PushSafer attachment {}.'.format(
attachment.url(privacy=True)))
return False
# Save our pre-prepared payload for attachment posting
attachments.append(attachment)
self.logger.debug(
'Appending PushSafer attachment {}'.format(
attachment.url(privacy=True)))
# Create a copy of the targets list
targets = list(self.targets)

View File

@ -342,11 +342,19 @@ class NotifySendGrid(NotifyBase):
# Send our attachments
for no, attachment in enumerate(attach, start=1):
# Perform some simple error checking
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access SendGrid attachment {}.'.format(
attachment.url(privacy=True)))
return False
try:
attachments.append({
"content": attachment.base64(),
"filename": attachment.name
if attachment.name else f'attach{no:03}.dat',
if attachment.name else f'file{no:03}.dat',
"type": "application/octet-stream",
"disposition": "attachment"
})
@ -354,7 +362,7 @@ class NotifySendGrid(NotifyBase):
except exception.AppriseException:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
'Could not access SendGrid attachment {}.'.format(
attachment.url(privacy=True)))
return False

View File

@ -29,10 +29,10 @@
import re
import requests
from json import dumps
import base64
from .base import NotifyBase
from ..common import NotifyType
from .. import exception
from ..utils import is_phone_no
from ..utils import parse_phone_no
from ..utils import parse_bool
@ -239,23 +239,24 @@ class NotifySignalAPI(NotifyBase):
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
'Could not access Signal API attachment {}.'.format(
attachment.url(privacy=True)))
return False
try:
with open(attachment.path, 'rb') as f:
# Prepare our Attachment in Base64
attachments.append(
base64.b64encode(f.read()).decode('utf-8'))
attachments.append(attachment.base64())
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occurred while reading {}.'.format(
attachment.name if attachment else 'attachment'))
self.logger.debug('I/O Exception: %s' % str(e))
except exception.AppriseException:
# We could not access the attachment
self.logger.error(
'Could not access Signal API attachment {}.'.format(
attachment.url(privacy=True)))
return False
self.logger.debug(
'Appending Signal API attachment {}'.format(
attachment.url(privacy=True)))
# Prepare our headers
headers = {
'User-Agent': self.app_id,

View File

@ -29,11 +29,11 @@
import re
import requests
from json import dumps, loads
import base64
from itertools import chain
from .base import NotifyBase
from ..common import NotifyType
from .. import exception
from ..utils import validate_regex
from ..utils import is_phone_no
from ..utils import parse_phone_no
@ -345,7 +345,7 @@ class NotifySMSEagle(NotifyBase):
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
'Could not access SMSEagle attachment {}.'.format(
attachment.url(privacy=True)))
return False
@ -357,21 +357,23 @@ class NotifySMSEagle(NotifyBase):
continue
try:
with open(attachment.path, 'rb') as f:
# Prepare our Attachment in Base64
attachments.append({
'content_type': attachment.mimetype,
'content': base64.b64encode(
f.read()).decode('utf-8'),
})
# Prepare our Attachment in Base64
attachments.append({
'content_type': attachment.mimetype,
'content': attachment.base64(),
})
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occurred while reading {}.'.format(
attachment.name if attachment else 'attachment'))
self.logger.debug('I/O Exception: %s' % str(e))
except exception.AppriseException:
# We could not access the attachment
self.logger.error(
'Could not access SMSEagle attachment {}.'.format(
attachment.url(privacy=True)))
return False
self.logger.debug(
'Appending SMSEagle attachment {}'.format(
attachment.url(privacy=True)))
# Prepare our headers
headers = {
'User-Agent': self.app_id,

View File

@ -45,11 +45,11 @@
# the email will be transmitted from. If no email address is specified
# then it will also become the 'to' address as well.
#
import base64
import requests
from json import dumps
from email.utils import formataddr
from .base import NotifyBase
from .. import exception
from ..common import NotifyType
from ..common import NotifyFormat
from ..utils import parse_emails
@ -299,28 +299,29 @@ class NotifySMTP2Go(NotifyBase):
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
'Could not access SMTP2Go attachment {}.'.format(
attachment.url(privacy=True)))
return False
try:
with open(attachment.path, 'rb') as f:
# Output must be in a DataURL format (that's what
# PushSafer calls it):
attachments.append({
'filename': attachment.name,
'fileblob': base64.b64encode(f.read())
.decode('utf-8'),
'mimetype': attachment.mimetype,
})
# Format our attachment
attachments.append({
'filename': attachment.name,
'fileblob': attachment.base64(),
'mimetype': attachment.mimetype,
})
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occurred while reading {}.'.format(
attachment.name if attachment else 'attachment'))
self.logger.debug('I/O Exception: %s' % str(e))
except exception.AppriseException:
# We could not access the attachment
self.logger.error(
'Could not access SMTP2Go attachment {}.'.format(
attachment.url(privacy=True)))
return False
self.logger.debug(
'Appending SMTP2Go attachment {}'.format(
attachment.url(privacy=True)))
sender = formataddr(
(self.from_name if self.from_name else False,
self.from_addr), charset='utf-8')

View File

@ -55,10 +55,10 @@
# API Documentation: https://developers.sparkpost.com/api/
# Specifically: https://developers.sparkpost.com/api/transmissions/
import requests
import base64
from json import loads
from json import dumps
from .base import NotifyBase
from .. import exception
from ..common import NotifyType
from ..common import NotifyFormat
from ..utils import is_email
@ -500,7 +500,7 @@ class NotifySparkPost(NotifyBase):
if not self.targets:
# There is no one to email; we're done
self.logger.warning(
'There are no Email recipients to notify')
'There are no SparkPost Email recipients to notify')
return False
# Initialize our has_error flag
@ -551,30 +551,29 @@ class NotifySparkPost(NotifyBase):
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
'Could not access SparkPost attachment {}.'.format(
attachment.url(privacy=True)))
return False
try:
# Prepare API Upload Payload
payload['content']['attachments'].append({
'name': attachment.name,
'type': attachment.mimetype,
'data': attachment.base64(),
})
except exception.AppriseException:
# We could not access the attachment
self.logger.error(
'Could not access SparkPost attachment {}.'.format(
attachment.url(privacy=True)))
return False
self.logger.debug(
'Preparing SparkPost attachment {}'.format(
'Appending SparkPost attachment {}'.format(
attachment.url(privacy=True)))
try:
with open(attachment.path, 'rb') as fp:
# Prepare API Upload Payload
payload['content']['attachments'].append({
'name': attachment.name,
'type': attachment.mimetype,
'data': base64.b64encode(fp.read()).decode("ascii")
})
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occurred while reading {}.'.format(
attachment.name if attachment else 'attachment'))
self.logger.debug('I/O Exception: %s' % str(e))
return False
# Take a copy of our token dictionary
tokens = self.tokens.copy()

View File

@ -29,6 +29,8 @@
import re
from unittest import mock
import pytest
from apprise import exception
import requests
import mimetypes
from os.path import join
@ -481,3 +483,33 @@ def test_attach_http(mock_get, mock_post):
assert mock_post.call_count == 30
# We only fetched once and re-used the same fetch for all posts
assert mock_get.call_count == 1
#
# We will test our base64 handling now
#
mock_get.reset_mock()
mock_post.reset_mock()
AttachHTTP.max_file_size = getsize(path)
# Set ourselves a Content-Disposition (providing a filename)
dummy_response.headers['Content-Disposition'] = \
'attachment; filename="myimage.gif"'
results = AttachHTTP.parse_url('http://user@localhost/filename.gif')
assert isinstance(results, dict)
obj = AttachHTTP(**results)
# now test our base64 output
assert isinstance(obj.base64(), str)
# No encoding if we choose
assert isinstance(obj.base64(encoding=None), bytes)
# Error cases:
with mock.patch("builtins.open", new_callable=mock.mock_open,
read_data="mocked file content") as mock_file:
mock_file.side_effect = FileNotFoundError
with pytest.raises(exception.AppriseFileNotFound):
obj.base64()
mock_file.side_effect = OSError
with pytest.raises(exception.AppriseDiskIOError):
obj.base64()

View File

@ -30,6 +30,7 @@ import re
import urllib
import pytest
from apprise import exception
from apprise.attachment.base import AttachBase
from apprise.attachment.memory import AttachMemory
from apprise import AppriseAttachment
@ -203,3 +204,12 @@ def test_attach_memory():
# Test hosted configuration and that we can't add a valid memory file
aa = AppriseAttachment(location=ContentLocation.HOSTED)
assert aa.add(response) is False
# now test our base64 output
assert isinstance(response.base64(), str)
# No encoding if we choose
assert isinstance(response.base64(encoding=None), bytes)
response.invalidate()
with pytest.raises(exception.AppriseFileNotFound):
response.base64()