mirror of https://github.com/caronc/apprise
Ntfy Authentication Token support added (#830)
parent
87e49ce68c
commit
4c542070eb
|
@ -74,6 +74,27 @@ NTFY_MODES = (
|
|||
NtfyMode.PRIVATE,
|
||||
)
|
||||
|
||||
# A Simple regular expression used to auto detect Auth mode if it isn't
|
||||
# otherwise specified:
|
||||
NTFY_AUTH_DETECT_RE = re.compile('tk_[^ \t]+', re.IGNORECASE)
|
||||
|
||||
|
||||
class NtfyAuth:
|
||||
"""
|
||||
Define ntfy Authentication Modes
|
||||
"""
|
||||
# Basic auth (user and password provided)
|
||||
BASIC = "basic"
|
||||
|
||||
# Auth Token based
|
||||
TOKEN = "token"
|
||||
|
||||
|
||||
NTFY_AUTH = (
|
||||
NtfyAuth.BASIC,
|
||||
NtfyAuth.TOKEN,
|
||||
)
|
||||
|
||||
|
||||
class NtfyPriority:
|
||||
"""
|
||||
|
@ -170,6 +191,8 @@ class NotifyNtfy(NotifyBase):
|
|||
'{schema}://{user}@{host}:{port}/{targets}',
|
||||
'{schema}://{user}:{password}@{host}/{targets}',
|
||||
'{schema}://{user}:{password}@{host}:{port}/{targets}',
|
||||
'{schema}://{token}@{host}/{targets}',
|
||||
'{schema}://{token}@{host}:{port}/{targets}',
|
||||
)
|
||||
|
||||
# Define our template tokens
|
||||
|
@ -193,6 +216,11 @@ class NotifyNtfy(NotifyBase):
|
|||
'type': 'string',
|
||||
'private': True,
|
||||
},
|
||||
'token': {
|
||||
'name': _('Token'),
|
||||
'type': 'string',
|
||||
'private': True,
|
||||
},
|
||||
'topic': {
|
||||
'name': _('Topic'),
|
||||
'type': 'string',
|
||||
|
@ -253,6 +281,15 @@ class NotifyNtfy(NotifyBase):
|
|||
'values': NTFY_MODES,
|
||||
'default': NtfyMode.PRIVATE,
|
||||
},
|
||||
'token': {
|
||||
'alias_of': 'token',
|
||||
},
|
||||
'auth': {
|
||||
'name': _('Authentication Type'),
|
||||
'type': 'choice:string',
|
||||
'values': NTFY_AUTH,
|
||||
'default': NtfyAuth.BASIC,
|
||||
},
|
||||
'to': {
|
||||
'alias_of': 'targets',
|
||||
},
|
||||
|
@ -260,7 +297,8 @@ class NotifyNtfy(NotifyBase):
|
|||
|
||||
def __init__(self, targets=None, attach=None, filename=None, click=None,
|
||||
delay=None, email=None, priority=None, tags=None, mode=None,
|
||||
include_image=True, avatar_url=None, **kwargs):
|
||||
include_image=True, avatar_url=None, auth=None, token=None,
|
||||
**kwargs):
|
||||
"""
|
||||
Initialize ntfy Object
|
||||
"""
|
||||
|
@ -279,6 +317,17 @@ class NotifyNtfy(NotifyBase):
|
|||
# Show image associated with notification
|
||||
self.include_image = include_image
|
||||
|
||||
# Prepare our authentication type
|
||||
self.auth = auth.strip().lower() \
|
||||
if isinstance(auth, str) \
|
||||
else self.template_args['auth']['default']
|
||||
|
||||
if self.auth not in NTFY_AUTH:
|
||||
msg = 'An invalid ntfy Authentication type ({}) was specified.' \
|
||||
.format(auth)
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
# Attach a file (URL supported)
|
||||
self.attach = attach
|
||||
|
||||
|
@ -294,6 +343,9 @@ class NotifyNtfy(NotifyBase):
|
|||
# An email to forward notifications to
|
||||
self.email = email
|
||||
|
||||
# Save our token
|
||||
self.token = token
|
||||
|
||||
# The Priority of the message
|
||||
self.priority = NotifyNtfy.template_args['priority']['default'] \
|
||||
if not priority else \
|
||||
|
@ -418,9 +470,17 @@ class NotifyNtfy(NotifyBase):
|
|||
|
||||
else: # NotifyNtfy.PRVATE
|
||||
# Allow more settings to be applied now
|
||||
if self.user:
|
||||
if self.auth == NtfyAuth.BASIC and self.user:
|
||||
auth = (self.user, self.password)
|
||||
|
||||
elif self.auth == NtfyAuth.TOKEN:
|
||||
if not self.token:
|
||||
self.logger.warning('No Ntfy Token was specified')
|
||||
return False, None
|
||||
|
||||
# Set Token
|
||||
headers['Authorization'] = f'Bearer {self.token}'
|
||||
|
||||
# Prepare our ntfy Template URL
|
||||
schema = 'https' if self.secure else 'http'
|
||||
|
||||
|
@ -575,6 +635,7 @@ class NotifyNtfy(NotifyBase):
|
|||
'priority': self.priority,
|
||||
'mode': self.mode,
|
||||
'image': 'yes' if self.include_image else 'no',
|
||||
'auth': self.auth,
|
||||
}
|
||||
|
||||
if self.avatar_url:
|
||||
|
@ -599,17 +660,24 @@ class NotifyNtfy(NotifyBase):
|
|||
|
||||
# Determine Authentication
|
||||
auth = ''
|
||||
if self.auth == NtfyAuth.BASIC:
|
||||
if self.user and self.password:
|
||||
auth = '{user}:{password}@'.format(
|
||||
user=NotifyNtfy.quote(self.user, safe=''),
|
||||
password=self.pprint(
|
||||
self.password, privacy, mode=PrivacyMode.Secret, safe=''),
|
||||
self.password, privacy, mode=PrivacyMode.Secret,
|
||||
safe=''),
|
||||
)
|
||||
elif self.user:
|
||||
auth = '{user}@'.format(
|
||||
user=NotifyNtfy.quote(self.user, safe=''),
|
||||
)
|
||||
|
||||
elif self.token: # NtfyAuth.TOKEN also
|
||||
auth = '{token}@'.format(
|
||||
token=self.pprint(self.token, privacy, safe=''),
|
||||
)
|
||||
|
||||
if self.mode == NtfyMode.PRIVATE:
|
||||
return '{schema}://{auth}{host}{port}/{targets}?{params}'.format(
|
||||
schema=self.secure_protocol if self.secure else self.protocol,
|
||||
|
@ -689,6 +757,37 @@ class NotifyNtfy(NotifyBase):
|
|||
results['targets'] += \
|
||||
NotifyNtfy.parse_list(results['qsd']['to'])
|
||||
|
||||
# Token Specified
|
||||
if 'token' in results['qsd'] and len(results['qsd']['token']):
|
||||
# Token presumed to be the one in use
|
||||
results['auth'] = NtfyAuth.TOKEN
|
||||
results['token'] = NotifyNtfy.unquote(results['qsd']['token'])
|
||||
|
||||
# Auth override
|
||||
if 'auth' in results['qsd'] and results['qsd']['auth']:
|
||||
results['auth'] = NotifyNtfy.unquote(
|
||||
results['qsd']['auth'].strip().lower())
|
||||
|
||||
if not results.get('auth') and results['user'] \
|
||||
and not results['password']:
|
||||
# We can try to detect the authentication type on the formatting of
|
||||
# the username. Look for tk_.*
|
||||
#
|
||||
# This isn't a surfire way to do things though; it's best to
|
||||
# specify the auth= flag
|
||||
results['auth'] = NtfyAuth.TOKEN \
|
||||
if NTFY_AUTH_DETECT_RE.match(results['user']) \
|
||||
else NtfyAuth.BASIC
|
||||
|
||||
if results.get('auth') == NtfyAuth.TOKEN and not results.get('token'):
|
||||
if results['user'] and not results['password']:
|
||||
# Make sure we properly set our token
|
||||
results['token'] = NotifyNtfy.unquote(results['user'])
|
||||
|
||||
elif results['password']:
|
||||
# Make sure we properly set our token
|
||||
results['token'] = NotifyNtfy.unquote(results['password'])
|
||||
|
||||
# Mode override
|
||||
if 'mode' in results['qsd'] and results['qsd']['mode']:
|
||||
results['mode'] = NotifyNtfy.unquote(
|
||||
|
|
|
@ -99,6 +99,8 @@ apprise_url_tests = (
|
|||
('ntfy://user@localhost/topic/', {
|
||||
'instance': NotifyNtfy,
|
||||
'requests_response_text': GOOD_RESPONSE_TEXT,
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
'privacy_url': 'ntfy://user@localhost/topic',
|
||||
}),
|
||||
# Ntfy cloud mode (enforced)
|
||||
('ntfy://ntfy.sh/topic1/topic2/', {
|
||||
|
@ -165,10 +167,55 @@ apprise_url_tests = (
|
|||
'instance': NotifyNtfy,
|
||||
'requests_response_text': GOOD_RESPONSE_TEXT,
|
||||
}),
|
||||
# Auth Token Types (tk_ gets detected as a auth=token)
|
||||
('ntfy://tk_abcd123456@localhost/topic1', {
|
||||
'instance': NotifyNtfy,
|
||||
'requests_response_text': GOOD_RESPONSE_TEXT,
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
'privacy_url': 'ntfy://t...6@localhost/topic1',
|
||||
}),
|
||||
# Force an auth token since lack of tk_ prevents auto-detection
|
||||
('ntfy://abcd123456@localhost/topic1?auth=token', {
|
||||
'instance': NotifyNtfy,
|
||||
'requests_response_text': GOOD_RESPONSE_TEXT,
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
'privacy_url': 'ntfy://a...6@localhost/topic1',
|
||||
}),
|
||||
# Force an auth token since lack of tk_ prevents auto-detection
|
||||
('ntfy://:abcd123456@localhost/topic1?auth=token', {
|
||||
'instance': NotifyNtfy,
|
||||
'requests_response_text': GOOD_RESPONSE_TEXT,
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
'privacy_url': 'ntfy://a...6@localhost/topic1',
|
||||
}),
|
||||
# Token detection already implied when token keyword is set
|
||||
('ntfy://localhost/topic1?token=abc1234', {
|
||||
'instance': NotifyNtfy,
|
||||
'requests_response_text': GOOD_RESPONSE_TEXT,
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
'privacy_url': 'ntfy://a...4@localhost/topic1',
|
||||
}),
|
||||
# Token enforced, but since a user/pass provided, only the pass is kept
|
||||
('ntfy://user:token@localhost/topic1?auth=token', {
|
||||
'instance': NotifyNtfy,
|
||||
'requests_response_text': GOOD_RESPONSE_TEXT,
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
'privacy_url': 'ntfy://t...n@localhost/topic1',
|
||||
}),
|
||||
# Token mode force, but there was no token provided
|
||||
('ntfy://localhost/topic1?auth=token', {
|
||||
'instance': NotifyNtfy,
|
||||
# We'll out-right fail to send the notification
|
||||
'response': False,
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
'privacy_url': 'ntfy://localhost/topic1',
|
||||
}),
|
||||
# Priority
|
||||
('ntfy://localhost/topic1/?priority=default', {
|
||||
'instance': NotifyNtfy,
|
||||
'requests_response_text': GOOD_RESPONSE_TEXT,
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
'privacy_url': 'ntfy://localhost/topic1',
|
||||
}),
|
||||
# Priority higher
|
||||
('ntfy://localhost/topic1/?priority=high', {
|
||||
|
@ -213,6 +260,10 @@ apprise_url_tests = (
|
|||
# Invalid mode
|
||||
'instance': TypeError,
|
||||
}),
|
||||
('ntfys://token@localhost/topic/?auth=invalid', {
|
||||
# Invalid Authentication type
|
||||
'instance': TypeError,
|
||||
}),
|
||||
# Invalid hostname on localhost/private mode
|
||||
('ntfys://user:web@-_/topic1/topic2/?mode=private', {
|
||||
'instance': None,
|
||||
|
|
Loading…
Reference in New Issue