refactored code to parse URL correctly

pull/1225/head
Chris Caron 2024-10-27 16:31:25 -04:00
parent bb5218afe6
commit 5a3248147b
2 changed files with 215 additions and 63 deletions

View File

@ -50,6 +50,24 @@ from ..utils import validate_regex
from ..locale import gettext_lazy as _ from ..locale import gettext_lazy as _
class Office365WebhookMode:
"""
Office 365 Webhook Mode
"""
# Send message as ourselves using the /me/ endpoint
SELF = 'self'
# Send message as ourselves using the /users/ endpoint
AS_USER = 'user'
# Define the modes in a list for validation purposes
OFFICE365_WEBHOOK_MODES = (
Office365WebhookMode.SELF,
Office365WebhookMode.AS_USER,
)
class NotifyOffice365(NotifyBase): class NotifyOffice365(NotifyBase):
""" """
A wrapper for Office 365 Notifications A wrapper for Office 365 Notifications
@ -62,7 +80,7 @@ class NotifyOffice365(NotifyBase):
service_url = 'https://office.com/' service_url = 'https://office.com/'
# The default protocol # The default protocol
secure_protocol = 'o365' secure_protocol = ('azure', 'o365')
# Allow 300 requests per minute. # Allow 300 requests per minute.
# 60/300 = 0.2 # 60/300 = 0.2
@ -103,7 +121,6 @@ class NotifyOffice365(NotifyBase):
'{schema}://{email}/{tenant}/{client_id}/{secret}', '{schema}://{email}/{tenant}/{client_id}/{secret}',
'{schema}://{email}/{tenant}/{client_id}/{secret}/{targets}', '{schema}://{email}/{tenant}/{client_id}/{secret}/{targets}',
# Send from 'me' # Send from 'me'
'{schema}://{tenant}/{client_id}/{secret}',
'{schema}://{tenant}/{client_id}/{secret}/{targets}', '{schema}://{tenant}/{client_id}/{secret}/{targets}',
) )
@ -164,15 +181,33 @@ class NotifyOffice365(NotifyBase):
'oauth_secret': { 'oauth_secret': {
'alias_of': 'secret', 'alias_of': 'secret',
}, },
'mode': {
'name': _('Webhook Mode'),
'type': 'choice:string',
'values': OFFICE365_WEBHOOK_MODES,
'default': Office365WebhookMode.SELF,
},
}) })
def __init__(self, tenant, email, client_id, secret, def __init__(self, tenant, client_id, secret, email=None,
targets=None, cc=None, bcc=None, **kwargs): mode=None, targets=None, cc=None, bcc=None, **kwargs):
""" """
Initialize Office 365 Object Initialize Office 365 Object
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
# Prepare our Mode
self.mode = self.template_args['mode']['default'] \
if not mode else next(
(f for f in OFFICE365_WEBHOOK_MODES
if f.startswith(
mode.lower())), None)
if mode and not self.mode:
msg = \
'The specified Webhook mode ({}) was not found '.format(mode)
self.logger.warning(msg)
raise TypeError(msg)
# Tenant identifier # Tenant identifier
self.tenant = validate_regex( self.tenant = validate_regex(
tenant, *self.template_tokens['tenant']['regex']) tenant, *self.template_tokens['tenant']['regex'])
@ -182,16 +217,24 @@ class NotifyOffice365(NotifyBase):
self.logger.warning(msg) self.logger.warning(msg)
raise TypeError(msg) raise TypeError(msg)
result = is_email(email) self.email = None
if not result: if email is not None:
msg = 'An invalid Office 365 Email Account ID' \ result = is_email(email)
'({}) was specified.'.format(email) if not result:
msg = 'An invalid Office 365 Email Account ID' \
'({}) was specified.'.format(email)
self.logger.warning(msg)
raise TypeError(msg)
# Otherwise store our the email address
self.email = result['full_email']
elif self.mode != Office365WebhookMode.SELF:
msg = 'An expected Office 365 Email was not specified ' \
'(mode={})'.format(self.mode)
self.logger.warning(msg) self.logger.warning(msg)
raise TypeError(msg) raise TypeError(msg)
# Otherwise store our the email address
self.email = result['full_email']
# Client Key (associated with generated OAuth2 Login) # Client Key (associated with generated OAuth2 Login)
self.client_id = validate_regex( self.client_id = validate_regex(
client_id, *self.template_tokens['client_id']['regex']) client_id, *self.template_tokens['client_id']['regex'])
@ -318,7 +361,7 @@ class NotifyOffice365(NotifyBase):
# Define our URL to post to # Define our URL to post to
url = '{graph_url}/v1.0/me/sendMail'.format( url = '{graph_url}/v1.0/me/sendMail'.format(
graph_url=self.graph_url, graph_url=self.graph_url,
) if not self.self.email \ ) if not self.email \
else '{graph_url}/v1.0/users/{userid}/sendMail'.format( else '{graph_url}/v1.0/users/{userid}/sendMail'.format(
userid=self.email, userid=self.email,
graph_url=self.graph_url, graph_url=self.graph_url,
@ -616,7 +659,7 @@ class NotifyOffice365(NotifyBase):
here. here.
""" """
return ( return (
self.secure_protocol, self.email, self.tenant, self.client_id, self.secure_protocol[0], self.email, self.tenant, self.client_id,
self.secret, self.secret,
) )
@ -625,8 +668,13 @@ class NotifyOffice365(NotifyBase):
Returns the URL built dynamically based on specified arguments. Returns the URL built dynamically based on specified arguments.
""" """
# Our URL parameters # Define any URL parameters
params = self.url_parameters(privacy=privacy, *args, **kwargs) params = {
'mode': self.mode,
}
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
if self.cc: if self.cc:
# Handle our Carbon Copy Addresses # Handle our Carbon Copy Addresses
@ -642,13 +690,13 @@ class NotifyOffice365(NotifyBase):
'' if not self.names.get(e) '' if not self.names.get(e)
else '{}:'.format(self.names[e]), e) for e in self.bcc]) else '{}:'.format(self.names[e]), e) for e in self.bcc])
return '{schema}://{tenant}:{email}/{client_id}/{secret}' \ return '{schema}://{email}{tenant}/{client_id}/{secret}' \
'/{targets}/?{params}'.format( '/{targets}/?{params}'.format(
schema=self.secure_protocol, schema=self.secure_protocol[0],
tenant=self.pprint(self.tenant, privacy, safe=''), tenant=self.pprint(self.tenant, privacy, safe=''),
# email does not need to be escaped because it should # email does not need to be escaped because it should
# already be a valid host and username at this point # already be a valid host and username at this point
email=self.email, email=self.email + '/' if self.email else '',
client_id=self.pprint(self.client_id, privacy, safe=''), client_id=self.pprint(self.client_id, privacy, safe=''),
secret=self.pprint( secret=self.pprint(
self.secret, privacy, mode=PrivacyMode.Secret, self.secret, privacy, mode=PrivacyMode.Secret,
@ -656,7 +704,7 @@ class NotifyOffice365(NotifyBase):
targets='/'.join( targets='/'.join(
[NotifyOffice365.quote('{}{}'.format( [NotifyOffice365.quote('{}{}'.format(
'' if not e[0] else '{}:'.format(e[0]), e[1]), '' if not e[0] else '{}:'.format(e[0]), e[1]),
safe='') for e in self.targets]), safe='@') for e in self.targets]),
params=NotifyOffice365.urlencode(params)) params=NotifyOffice365.urlencode(params))
def __len__(self): def __len__(self):
@ -687,6 +735,7 @@ class NotifyOffice365(NotifyBase):
# Initialize our tenant # Initialize our tenant
results['tenant'] = None results['tenant'] = None
# Initialize our email # Initialize our email
results['email'] = None results['email'] = None
@ -697,28 +746,36 @@ class NotifyOffice365(NotifyBase):
results['email'] = \ results['email'] = \
NotifyOffice365.unquote(results['qsd']['from']) NotifyOffice365.unquote(results['qsd']['from'])
# Hostname is no longer part of `from` and possibly instead # If tenant is occupied, then the user defined makes up our email
# is the tenant id elif results['user']:
entries.insert(0, NotifyOffice365.unquote(results['host']))
# Tenant
if 'tenant' in results['qsd'] and \
len(results['qsd']['tenant']):
# Extract the Tenant from the argument
results['tenant'] = \
NotifyOffice365.unquote(results['qsd']['tenant'])
# If tenant is occupied, then the user defined makes
# up our email
if not results['email'] and results['user']:
results['email'] = '{}@{}'.format( results['email'] = '{}@{}'.format(
NotifyOffice365.unquote(results['user']), NotifyOffice365.unquote(results['user']),
NotifyOffice365.unquote(results['host']), NotifyOffice365.unquote(results['host']),
) )
elif not results['user']: else:
# Only tenant id specified (emails are sent 'from me') # Hostname is no longer part of `from` and possibly instead
results['tenant'] = NotifyOffice365.unquote(results['host']) # is the tenant id
entries.insert(0, NotifyOffice365.unquote(results['host']))
# Tenant
if 'tenant' in results['qsd'] and len(results['qsd']['tenant']):
# Extract the Tenant from the argument
results['tenant'] = \
NotifyOffice365.unquote(results['qsd']['tenant'])
elif entries:
results['tenant'] = NotifyOffice365.unquote(entries.pop(0))
# OAuth2 ID
if 'oauth_id' in results['qsd'] and len(results['qsd']['oauth_id']):
# Extract the API Key from an argument
results['client_id'] = \
NotifyOffice365.unquote(results['qsd']['oauth_id'])
elif entries:
# Get our client_id is the first entry on the path
results['client_id'] = NotifyOffice365.unquote(entries.pop(0))
# #
# Prepare our target listing # Prepare our target listing
@ -740,16 +797,6 @@ class NotifyOffice365(NotifyBase):
# We're done # We're done
break break
# OAuth2 ID
if 'oauth_id' in results['qsd'] and len(results['qsd']['oauth_id']):
# Extract the API Key from an argument
results['client_id'] = \
NotifyOffice365.unquote(results['qsd']['oauth_id'])
elif entries:
# Get our client_id is the first entry on the path
results['client_id'] = NotifyOffice365.unquote(entries.pop(0))
# OAuth2 Secret # OAuth2 Secret
if 'oauth_secret' in results['qsd'] and \ if 'oauth_secret' in results['qsd'] and \
len(results['qsd']['oauth_secret']): len(results['qsd']['oauth_secret']):
@ -778,4 +825,8 @@ class NotifyOffice365(NotifyBase):
if 'bcc' in results['qsd'] and len(results['qsd']['bcc']): if 'bcc' in results['qsd'] and len(results['qsd']['bcc']):
results['bcc'] = results['qsd']['bcc'] results['bcc'] = results['qsd']['bcc']
# Handle Mode
if 'mode' in results['qsd'] and len(results['qsd']['mode']):
results['mode'] = results['qsd']['mode']
return results return results

View File

@ -34,6 +34,8 @@ import requests
from datetime import datetime from datetime import datetime
from json import dumps from json import dumps
from apprise import Apprise from apprise import Apprise
from apprise import NotifyType
from apprise import AppriseAttachment
from apprise.plugins.office365 import NotifyOffice365 from apprise.plugins.office365 import NotifyOffice365
from helpers import AppriseURLTester from helpers import AppriseURLTester
@ -57,7 +59,7 @@ apprise_url_tests = (
# invalid url # invalid url
'instance': TypeError, 'instance': TypeError,
}), }),
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format( ('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
# invalid tenant # invalid tenant
tenant=',', tenant=',',
cid='ab-cd-ef-gh', cid='ab-cd-ef-gh',
@ -65,10 +67,10 @@ apprise_url_tests = (
secret='abcd/123/3343/@jack/test', secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), { targets='/'.join(['email1@test.ca'])), {
# We're valid and good to go # Expected failure
'instance': TypeError, 'instance': TypeError,
}), }),
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format( ('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
tenant='tenant', tenant='tenant',
# invalid client id # invalid client id
cid='ab.', cid='ab.',
@ -76,16 +78,53 @@ apprise_url_tests = (
secret='abcd/123/3343/@jack/test', secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), { targets='/'.join(['email1@test.ca'])), {
# We're valid and good to go # Expected failure
'instance': TypeError, 'instance': TypeError,
}), }),
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format( ('o365://{aid}/{tenant}/{cid}/{secret}/{targets}?mode=invalid'.format(
# Invalid mode
tenant='tenant', tenant='tenant',
cid='ab-cd-ef-gh', cid='ab-cd-ef-gh',
aid='user@example.com', aid='user@example.com',
secret='abcd/123/3343/@jack/test', secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), { targets='/'.join(['email1@test.ca'])), {
# Expected failure
'instance': TypeError,
}),
('o365://{tenant}/{cid}/{secret}/{targets}?mode=user'.format(
# Invalid mode when no email specified
tenant='tenant',
cid='ab-cd-ef-gh',
secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), {
# Expected failure
'instance': TypeError,
}),
('o365://{tenant}/{cid}/{secret}/{targets}?mode=self'.format(
# email not required if mode is set to self
tenant='tenant',
cid='ab-cd-ef-gh',
secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), {
# We're valid and good to go
'instance': NotifyOffice365,
# Test what happens if a batch send fails to return a messageCount
'requests_response_text': {
'expires_in': 2000,
'access_token': 'abcd1234',
},
}),
('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
tenant='tenant',
cid='ab-cd-ef-gh',
aid='user@example.edu',
secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), {
# We're valid and good to go # We're valid and good to go
'instance': NotifyOffice365, 'instance': NotifyOffice365,
@ -96,14 +135,14 @@ apprise_url_tests = (
}, },
# Our expected url(privacy=True) startswith() response: # Our expected url(privacy=True) startswith() response:
'privacy_url': 'o365://t...t:user@example.com/a...h/' 'privacy_url': 'azure://user@example.edu/t...t/a...h/'
'****/email1%40test.ca/'}), '****/email1@test.ca/'}),
# test our arguments # test our arguments
('o365://_/?oauth_id={cid}&oauth_secret={secret}&tenant={tenant}' ('o365://_/?oauth_id={cid}&oauth_secret={secret}&tenant={tenant}'
'&to={targets}&from={aid}'.format( '&to={targets}&from={aid}'.format(
tenant='tenant', tenant='tenant',
cid='ab-cd-ef-gh', cid='ab-cd-ef-gh',
aid='user@example.com', aid='user@example.ca',
secret='abcd/123/3343/@jack/test', secret='abcd/123/3343/@jack/test',
targets='email1@test.ca'), targets='email1@test.ca'),
{ {
@ -117,10 +156,10 @@ apprise_url_tests = (
}, },
# Our expected url(privacy=True) startswith() response: # Our expected url(privacy=True) startswith() response:
'privacy_url': 'o365://t...t:user@example.com/a...h/' 'privacy_url': 'azure://user@example.ca/t...t/a...h/'
'****/email1%40test.ca/'}), '****/email1@test.ca/'}),
# Test invalid JSON (no tenant defaults to email domain) # Test invalid JSON (no tenant defaults to email domain)
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format( ('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
tenant='tenant', tenant='tenant',
cid='ab-cd-ef-gh', cid='ab-cd-ef-gh',
aid='user@example.com', aid='user@example.com',
@ -135,7 +174,7 @@ apprise_url_tests = (
'notify_response': False, 'notify_response': False,
}), }),
# No Targets specified # No Targets specified
('o365://{tenant}:{aid}/{cid}/{secret}'.format( ('o365://{aid}/{tenant}/{cid}/{secret}'.format(
tenant='tenant', tenant='tenant',
cid='ab-cd-ef-gh', cid='ab-cd-ef-gh',
aid='user@example.com', aid='user@example.com',
@ -150,7 +189,7 @@ apprise_url_tests = (
'access_token': 'abcd1234', 'access_token': 'abcd1234',
}, },
}), }),
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format( ('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
tenant='tenant', tenant='tenant',
cid='zz-zz-zz-zz', cid='zz-zz-zz-zz',
aid='user@example.com', aid='user@example.com',
@ -212,7 +251,7 @@ def test_plugin_office365_general(mock_post):
# Instantiate our object # Instantiate our object
obj = Apprise.instantiate( obj = Apprise.instantiate(
'o365://{tenant}:{email}/{tenant}/{secret}/{targets}'.format( 'o365://{email}/{tenant}/{secret}/{targets}'.format(
tenant=tenant, tenant=tenant,
email=email, email=email,
secret=secret, secret=secret,
@ -228,10 +267,11 @@ def test_plugin_office365_general(mock_post):
# Instantiate our object # Instantiate our object
obj = Apprise.instantiate( obj = Apprise.instantiate(
'o365://{tenant}:{email}/{tenant}/{secret}/{targets}' 'o365://{email}/{tenant}/{client_id}/{secret}/{targets}'
'?bcc={bcc}&cc={cc}'.format( '?bcc={bcc}&cc={cc}'.format(
tenant=tenant, tenant=tenant,
email=email, email=email,
client_id=client_id,
secret=secret, secret=secret,
targets=targets, targets=targets,
# Test the cc and bcc list (use good and bad email) # Test the cc and bcc list (use good and bad email)
@ -260,7 +300,7 @@ def test_plugin_office365_general(mock_post):
with pytest.raises(TypeError): with pytest.raises(TypeError):
# Invalid email # Invalid email
NotifyOffice365( NotifyOffice365(
email=None, email='invalid',
client_id=client_id, client_id=client_id,
tenant=tenant, tenant=tenant,
secret=secret, secret=secret,
@ -336,7 +376,7 @@ def test_plugin_office365_authentication(mock_post):
# Instantiate our object # Instantiate our object
obj = Apprise.instantiate( obj = Apprise.instantiate(
'o365://{tenant}:{email}/{client_id}/{secret}/{targets}'.format( 'azure://{email}/{tenant}/{client_id}/{secret}/{targets}'.format(
client_id=client_id, client_id=client_id,
tenant=tenant, tenant=tenant,
email=email, email=email,
@ -394,3 +434,64 @@ def test_plugin_office365_authentication(mock_post):
del invalid_auth_entries['expires_in'] del invalid_auth_entries['expires_in']
response.content = dumps(invalid_auth_entries) response.content = dumps(invalid_auth_entries)
assert obj.authenticate() is False assert obj.authenticate() is False
@mock.patch('requests.post')
def test_plugin_office365_attachments(mock_post):
"""
NotifyOffice365() Attachments
"""
# Initialize some generic (but valid) tokens
email = 'user@example.net'
tenant = 'ff-gg-hh-ii-jj'
client_id = 'aa-bb-cc-dd-ee'
secret = 'abcd/1234/abcd@ajd@/test'
targets = 'target@example.com'
# Prepare Mock return object
authentication = {
"token_type": "Bearer",
"expires_in": 6000,
"access_token": "abcd1234"
}
okay_response = mock.Mock()
okay_response.content = dumps(authentication)
okay_response.status_code = requests.codes.ok
mock_post.return_value = okay_response
# Instantiate our object
obj = Apprise.instantiate(
'azure://{email}/{tenant}/{client_id}{secret}/{targets}'.format(
client_id=client_id,
tenant=tenant,
email=email,
secret=secret,
targets=targets))
assert isinstance(obj, NotifyOffice365)
# Test Valid Attachment
path = os.path.join(TEST_VAR_DIR, 'apprise-test.gif')
attach = AppriseAttachment(path)
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
# Test invalid attachment
path = os.path.join(TEST_VAR_DIR, '/invalid/path/to/an/invalid/file.jpg')
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=path) is False
with mock.patch('base64.b64encode', side_effect=OSError()):
# We can't send the message if we fail to parse the data
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is False
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
assert mock_post.call_count == 3