Added Splunk/VictorOps Support (#1125)

pull/1128/head
Chris Caron 2024-05-16 17:55:04 -04:00 committed by GitHub
parent 9d98347dcb
commit 45e67beae3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 673 additions and 4 deletions

View File

@ -90,6 +90,7 @@ SMS Manager
SMTP2Go SMTP2Go
SNS SNS
SparkPost SparkPost
Splunk
Streamlabs Streamlabs
Stride Stride
Synology Chat Synology Chat
@ -100,6 +101,7 @@ Threema Gateway
Twilio Twilio
Twist Twist
Twitter Twitter
VictorOps
Voipms Voipms
Vonage Vonage
Webex Webex

View File

@ -120,8 +120,9 @@ The table below identifies the services this tool supports and some example serv
| [SimplePush](https://github.com/caronc/apprise/wiki/Notify_simplepush) | spush:// | (TCP) 443 | spush://apikey<br />spush://salt:password@apikey<br />spush://apikey?event=Apprise | [SimplePush](https://github.com/caronc/apprise/wiki/Notify_simplepush) | spush:// | (TCP) 443 | spush://apikey<br />spush://salt:password@apikey<br />spush://apikey?event=Apprise
| [Slack](https://github.com/caronc/apprise/wiki/Notify_slack) | slack:// | (TCP) 443 | slack://TokenA/TokenB/TokenC/<br />slack://TokenA/TokenB/TokenC/Channel<br />slack://botname@TokenA/TokenB/TokenC/Channel<br />slack://user@TokenA/TokenB/TokenC/Channel1/Channel2/ChannelN | [Slack](https://github.com/caronc/apprise/wiki/Notify_slack) | slack:// | (TCP) 443 | slack://TokenA/TokenB/TokenC/<br />slack://TokenA/TokenB/TokenC/Channel<br />slack://botname@TokenA/TokenB/TokenC/Channel<br />slack://user@TokenA/TokenB/TokenC/Channel1/Channel2/ChannelN
| [SMTP2Go](https://github.com/caronc/apprise/wiki/Notify_smtp2go) | smtp2go:// | (TCP) 443 | smtp2go://user@hostname/apikey<br />smtp2go://user@hostname/apikey/email<br />smtp2go://user@hostname/apikey/email1/email2/emailN<br />smtp2go://user@hostname/apikey/?name="From%20User" | [SMTP2Go](https://github.com/caronc/apprise/wiki/Notify_smtp2go) | smtp2go:// | (TCP) 443 | smtp2go://user@hostname/apikey<br />smtp2go://user@hostname/apikey/email<br />smtp2go://user@hostname/apikey/email1/email2/emailN<br />smtp2go://user@hostname/apikey/?name="From%20User"
| [SparkPost](https://github.com/caronc/apprise/wiki/Notify_sparkpost) | sparkpost:// | (TCP) 443 | sparkpost://route_key@apikey<br />splunk://route_key@apikey/entity_id
| [Spunk](https://github.com/caronc/apprise/wiki/Notify_splunk) | splunk:// or victorops:/ | (TCP) 443 | splunk://user@hostname/apikey<br />sparkpost://user@hostname/apikey/email<br />sparkpost://user@hostname/apikey/email1/email2/emailN<br />sparkpost://user@hostname/apikey/?name="From%20User"
| [Streamlabs](https://github.com/caronc/apprise/wiki/Notify_streamlabs) | strmlabs:// | (TCP) 443 | strmlabs://AccessToken/<br/>strmlabs://AccessToken/?name=name&identifier=identifier&amount=0&currency=USD | [Streamlabs](https://github.com/caronc/apprise/wiki/Notify_streamlabs) | strmlabs:// | (TCP) 443 | strmlabs://AccessToken/<br/>strmlabs://AccessToken/?name=name&identifier=identifier&amount=0&currency=USD
| [SparkPost](https://github.com/caronc/apprise/wiki/Notify_sparkpost) | sparkpost:// | (TCP) 443 | sparkpost://user@hostname/apikey<br />sparkpost://user@hostname/apikey/email<br />sparkpost://user@hostname/apikey/email1/email2/emailN<br />sparkpost://user@hostname/apikey/?name="From%20User"
| [Synology Chat](https://github.com/caronc/apprise/wiki/Notify_synology_chat) | synology:// or synologys:// | (TCP) 80 or 443 | synology://hostname/token<br />synology://hostname:port/token | [Synology Chat](https://github.com/caronc/apprise/wiki/Notify_synology_chat) | synology:// or synologys:// | (TCP) 80 or 443 | synology://hostname/token<br />synology://hostname:port/token
| [Syslog](https://github.com/caronc/apprise/wiki/Notify_syslog) | syslog:// | n/a | syslog://<br />syslog://Facility | [Syslog](https://github.com/caronc/apprise/wiki/Notify_syslog) | syslog:// | n/a | syslog://<br />syslog://Facility
| [Telegram](https://github.com/caronc/apprise/wiki/Notify_telegram) | tgram:// | (TCP) 443 | tgram://bottoken/ChatID<br />tgram://bottoken/ChatID1/ChatID2/ChatIDN | [Telegram](https://github.com/caronc/apprise/wiki/Notify_telegram) | tgram:// | (TCP) 443 | tgram://bottoken/ChatID<br />tgram://bottoken/ChatID1/ChatID2/ChatIDN

488
apprise/plugins/splunk.py Normal file
View File

@ -0,0 +1,488 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2024, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# Splunk On-Call
# API: https://portal.victorops.com/public/api-docs.html
# Main: https://www.splunk.com/en_us/products/on-call.html
# Routing Keys https://help.victorops.com/knowledge-base/routing-keys/
# Setup: https://help.victorops.com/knowledge-base/rest-endpoint-integration\
# -guide/
import re
import requests
from json import dumps
from .base import NotifyBase
from ..common import NotifyType, NOTIFY_TYPES
from ..utils import validate_regex
from ..locale import gettext_lazy as _
class SplunkAction:
"""
Tracks the actions supported by Apprise Splunk Plugin
"""
# Use mapping (specify :key=arg to over-ride)
MAP = 'map'
# Creates a timeline event but does not trigger an incident
INFO = 'info'
# Triggers a warning (possibly causing incident) in all cases
WARNING = 'warning'
# Triggers an incident in all cases
CRITICAL = 'critical'
# Acknowldege entity_id provided in all cases
ACKNOWLEDGE = 'acknowledgement'
# Recovery entity_id provided in all cases
RECOVERY = 'recovery'
# Resolve (aliase of Recover)
RESOLVE = 'resolve'
# Define our Splunk Actions
SPLUNK_ACTIONS = (
SplunkAction.MAP,
SplunkAction.INFO,
SplunkAction.ACKNOWLEDGE,
SplunkAction.WARNING,
SplunkAction.RECOVERY,
SplunkAction.RESOLVE,
SplunkAction.CRITICAL,
)
class SplunkMessageType:
"""
Defines the supported splunk message types
"""
# Triggers an incident
CRITICAL = 'CRITICAL'
# May trigger an incident, depending on your settings
WARNING = 'WARNING'
# Acks an incident
ACKNOWLEDGEMENT = 'ACKNOWLEDGEMENT'
# Creates a timeline event but does not trigger an incident
INFO = 'INFO'
# Resolves an incident
RECOVERY = 'RECOVERY'
# Defines our supported message types
SPLUNK_MESSAGE_TYPES = (
SplunkMessageType.CRITICAL,
SplunkMessageType.WARNING,
SplunkMessageType.ACKNOWLEDGEMENT,
SplunkMessageType.INFO,
SplunkMessageType.RECOVERY,
)
class NotifySplunk(NotifyBase):
"""
A wrapper for Splunk Notifications
"""
# The default descriptive name associated with the Notification
service_name = _('Splunk On-Call')
# The services URL
service_url = 'https://www.splunk.com/en_us/products/on-call.html'
# The default secure protocol
secure_protocol = ('splunk', 'victorops')
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_splunk'
# Notification URL
notify_url = 'https://alert.victorops.com/integrations/generic/20131114/'\
'alert/{apikey}/{routing_key}'
# Define object templates
templates = (
'{schema}://{routing_key}@{apikey}',
'{schema}://{routing_key}@{apikey}/{entity_id}',
)
# The title is not used
title_maxlen = 60
# body limit
body_maxlen = 400
# Defines our default message mapping
splunk_message_map = {
# Creates a timeline event but doesnot trigger an incident
NotifyType.INFO: SplunkMessageType.INFO,
# Resolves an incident
NotifyType.SUCCESS: SplunkMessageType.RECOVERY,
# May trigger an incident, depending on your settings
NotifyType.WARNING: SplunkMessageType.WARNING,
# Triggers an incident
NotifyType.FAILURE: SplunkMessageType.CRITICAL,
}
# Define our tokens; these are the minimum tokens required required to
# be passed into this function (as arguments). The syntax appends any
# previously defined in the base package and builds onto them
template_tokens = dict(NotifyBase.template_tokens, **{
'apikey': {
'name': _('API Key'),
'type': 'string',
'private': True,
'required': True,
'regex': (r'^[A-Z0-9_-]+$', 'i'),
},
'routing_key': {
'name': _('Target Routing Key'),
'type': 'string',
'required': True,
'regex': (r'^[A-Z0-9_-]+$', 'i'),
},
'entity_id': {
# Provide a value such as: "disk space/db01.mycompany.com"
'name': _('Entity ID'),
'type': 'string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'apikey': {
'alias_of': 'apikey',
},
'routing_key': {
'alias_of': 'routing_key',
},
'route': {
'alias_of': 'routing_key',
},
'entity_id': {
'alias_of': 'entity_id',
},
'action': {
'name': _('Action'),
'type': 'choice:string',
'values': SPLUNK_ACTIONS,
'default': SPLUNK_ACTIONS[0],
}
})
# Define any kwargs we're using
template_kwargs = {
'mapping': {
'name': _('Action Mapping'),
'prefix': ':',
},
}
def __init__(self, apikey, routing_key, entity_id=None, action=None,
mapping=None, **kwargs):
"""
Initialize Splunk Object
"""
super().__init__(**kwargs)
self.apikey = validate_regex(
apikey, *self.template_tokens['apikey']['regex'])
if not self.apikey:
msg = 'The Splunk API Key specified ({}) is invalid.'\
.format(apikey)
self.logger.warning(msg)
raise TypeError(msg)
self.routing_key = validate_regex(
routing_key, *self.template_tokens['routing_key']['regex'])
if not self.routing_key:
msg = 'The Splunk Routing Key specified ({}) is invalid.'\
.format(routing_key)
self.logger.warning(msg)
raise TypeError(msg)
if not (isinstance(entity_id, str)
and len(entity_id.strip(' \r\n\t\v/'))):
# Use routing key
self.entity_id = f"{self.app_id}/{self.routing_key}"
else:
# Assign what was defined:
self.entity_id = entity_id.strip(' \r\n\t\v/')
if isinstance(action, str) and action:
self.action = next(
(a for a in SPLUNK_ACTIONS if a.startswith(action)), None)
if self.action not in SPLUNK_ACTIONS:
msg = 'The Splunk action specified ({}) is invalid.'\
.format(action)
self.logger.warning(msg)
raise TypeError(msg)
else:
self.action = self.template_args['action']['default'] \
# Store our mappings
self.mapping = self.splunk_message_map.copy()
if mapping and isinstance(mapping, dict):
for _k, _v in mapping.items():
# Get our mapping
k = next((t for t in NOTIFY_TYPES if t.startswith(_k)), None)
if not k:
msg = 'The Splunk mapping key specified ({}) is invalid.'\
.format(_k)
self.logger.warning(msg)
raise TypeError(msg)
_v_upper = _v.upper()
v = next((v for v in SPLUNK_MESSAGE_TYPES
if v.startswith(_v_upper)), None)
if not v:
msg = 'The Splunk mapping value (assigned to {}) ' \
'specified ({}) is invalid.'.format(k, _v)
self.logger.warning(msg)
raise TypeError(msg)
# Update our mapping
self.mapping[k] = v
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Send our notification
"""
# prepare our headers
headers = {
'User-Agent': self.app_id,
'Content-Type': "application/json",
}
# Set up our message type
if self.action == SplunkAction.MAP:
# Use Mapping
message_type = self.mapping[notify_type]
elif self.action == SplunkAction.ACKNOWLEDGE:
# Always Acknowledge
message_type = SplunkMessageType.ACKNOWLEDGEMENT
elif self.action == SplunkAction.INFO:
# Creates a timeline event but does not trigger an incident
message_type = SplunkMessageType.INFO
elif self.action == SplunkAction.CRITICAL:
# Always create Incident
message_type = SplunkMessageType.CRITICAL
elif self.action == SplunkAction.WARNING:
# Always trigger warning (potentially creating incident)
message_type = SplunkMessageType.WARNING
else: # self.action == SplunkAction.RECOVERY or SplunkAction.RESOLVE
# Always Recover
message_type = SplunkMessageType.RECOVERY
# Prepare our payload
payload = {
"entity_id": self.entity_id,
"message_type": message_type,
"entity_display_name": title if title else self.app_desc,
"state_message": body,
"monitoring_tool": self.app_id,
}
notify_url = self.notify_url.format(
apikey=self.apikey,
routing_key=self.routing_key)
self.logger.debug('Splunk GET URL: %s (cert_verify=%r)' % (
notify_url, self.verify_certificate))
self.logger.debug('Splunk Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
notify_url,
data=dumps(payload).encode('utf-8'),
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
# Sample Response
# {
# "result" : "success",
# "entity_id" : "disk space/db01.mycompany.com"
# }
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifySplunk.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send Splunk notification: '
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug('Response Details:\r\n{}'.format(r.content))
# Return; we're done
return False
else:
self.logger.info('Sent Splunk notification.')
except requests.RequestException as e:
self.logger.warning(
'A Connection error occurred sending Splunk '
'notification.')
self.logger.debug('Socket Exception: %s' % str(e))
# Return; we're done
return False
return True
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any URL parameters
params = {
'action': self.action,
}
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
# Append our assignment extra's into our parameters
params.update(
{':{}'.format(k): v for k, v in self.mapping.items()})
return '{schema}://{routing_key}@{apikey}/{entity_id}?{params}'.format(
schema=self.secure_protocol[0],
routing_key=self.routing_key,
entity_id='' if self.entity_id == self.routing_key
else self.entity_id,
apikey=self.pprint(self.apikey, privacy, safe=''),
params=NotifySplunk.urlencode(params),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to re-instantiate this object.
"""
# parse_url already handles getting the `user` and `password` fields
# populated.
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# Entity ID
if 'entity_id' in results['qsd'] and len(results['qsd']['entity_id']):
results['entity_id'] = \
NotifySplunk.unquote(results['qsd']['entity_id'])
else:
results['entity_id'] = NotifySplunk.unquote(results['fullpath'])
# API Key
if 'apikey' in results['qsd'] and len(results['qsd']['apikey']):
results['apikey'] = NotifySplunk.unquote(results['qsd']['apikey'])
else:
results['apikey'] = NotifySplunk.unquote(results['host'])
# Routing Key
if 'routing_key' in results['qsd'] \
and len(results['qsd']['routing_key']):
results['routing_key'] = \
NotifySplunk.unquote(results['qsd']['routing_key'])
elif 'route' in results['qsd'] and len(results['qsd']['route']):
results['routing_key'] = \
NotifySplunk.unquote(results['qsd']['route'])
else:
results['routing_key'] = NotifySplunk.unquote(results['user'])
# Store our action (if defined)
if 'action' in results['qsd'] and len(results['qsd']['action']):
results['action'] = NotifySplunk.unquote(results['qsd']['action'])
# store any custom mapping defined
results['mapping'] = {NotifySplunk.unquote(x): NotifySplunk.unquote(y)
for x, y in results['qsd:'].items()}
return results
@staticmethod
def parse_native_url(url):
"""
Support https://alert.victorops.com/integrations/generic/20131114/ \
alert/apikey/routing_key
"""
result = re.match(
r'^https?://alert\.victorops\.com/integrations/generic/'
r'(?P<version>[0-9]+)/alert/(?P<apikey>[0-9a-z_-]+)'
r'(/(?P<routing_key>[^?/]+))'
r'(/(?P<entity_id>[^?]+))?/*'
r'(?P<params>\?.+)?$', url, re.I)
if result:
return NotifySplunk.parse_url(
'{schema}://{routing_key}@{apikey}/{entity_id}{params}'.format(
schema=NotifySplunk.secure_protocol[0],
apikey=result.group('apikey'),
routing_key=result.group('routing_key'),
entity_id='' if not result.group('entity_id')
else result.group('entity_id'),
params='' if not result.group('params')
else result.group('params')))
return None

View File

@ -49,9 +49,10 @@ NextcloudTalk, Notica, Notifiarr, Notifico, ntfy, Office365, OneSignal,
Opsgenie, PagerDuty, PagerTree, ParsePlatform, PopcornNotify, Prowl, Pushalot, Opsgenie, PagerDuty, PagerTree, ParsePlatform, PopcornNotify, Prowl, Pushalot,
PushBullet, Pushjet, PushMe, Pushover, PushSafer, Pushy, PushDeer, Revolt, PushBullet, Pushjet, PushMe, Pushover, PushSafer, Pushy, PushDeer, Revolt,
Reddit, Rocket.Chat, RSyslog, SendGrid, ServerChan, Signal, SimplePush, Sinch, Reddit, Rocket.Chat, RSyslog, SendGrid, ServerChan, Signal, SimplePush, Sinch,
Slack, SMSEagle, SMS Manager, SMTP2Go, SparkPost, Super Toasty, Streamlabs, Slack, SMSEagle, SMS Manager, SMTP2Go, SparkPost, Splunk, Super Toasty,
Stride, Synology Chat, Syslog, Techulus Push, Telegram, Threema Gateway, Twilio, Streamlabs, Stride, Synology Chat, Syslog, Techulus Push, Telegram, Threema
Twitter, Twist, XBMC, Voipms, Vonage, WeCom Bot, WhatsApp, Webex Teams} Gateway, Twilio, Twitter, Twist, XBMC, VictorOps, Voipms, Vonage, WeCom Bot,
WhatsApp, Webex Teams}
Name: python-%{pypi_name} Name: python-%{pypi_name}
Version: 1.8.0 Version: 1.8.0

177
test/test_plugin_splunk.py Normal file
View File

@ -0,0 +1,177 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2024, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import requests
from apprise.plugins.splunk import NotifySplunk
from helpers import AppriseURLTester
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
# Our Testing URLs
apprise_url_tests = (
('splunk://', {
'instance': TypeError,
}),
('splunk://:@/', {
'instance': TypeError,
}),
('splunk://routekey@%badapi%', {
'instance': TypeError,
}),
('splunk://abc123', {
# No route key provided
'instance': TypeError,
}),
('splunk://%badroute%@apikey', {
'instance': TypeError,
}),
('splunk://?apikey=abc123&routing_key=db', {
# We're good
'instance': NotifySplunk,
}),
('splunk://route@abc123/entity_id', {
# We're good
'instance': NotifySplunk,
}),
('splunk://route@abc123/?entity_id=my_entity', {
# We're good
'instance': NotifySplunk,
}),
# Support legacy URL
('https://alert.victorops.com/integrations/generic/20131114/' \
'alert/apikey/routing_key', {
# We're good
'instance': NotifySplunk,
}),
# Support legacy URL (with entity id provided)
('https://alert.victorops.com/integrations/generic/20131114/' \
'alert/apikey/routing_key/entity_id', {
# We're good
'instance': NotifySplunk,
}),
# support victorops:// too!
('victorops://?apikey=abc123&route=db', {
# We're good
'instance': NotifySplunk,
}),
('splunk://?apikey=abc123&route=db', {
# We're good
'instance': NotifySplunk,
}),
('splunk://db@apikey?action=recovery', {
# Always Recovery Alias
'instance': NotifySplunk,
}),
('splunk://db@apikey?action=resolve', {
# Always Recovery Alias
'instance': NotifySplunk,
}),
('splunk://db@apikey?action=r', {
# Always Recovery (short form)
'instance': NotifySplunk,
}),
('splunk://db@apikey?action=acknowledgement', {
# Always Acknowledgement
'instance': NotifySplunk,
}),
('splunk://db@apikey?action=ack', {
# Always Acknowledgement (short form)
'instance': NotifySplunk,
}),
('splunk://db@apikey?action=critical', {
# Always Critical
'instance': NotifySplunk,
}),
('splunk://db@apikey?action=crit', {
# Always Critical (short form)
'instance': NotifySplunk,
}),
('splunk://db@apikey?action=warning', {
# Always Warning
'instance': NotifySplunk,
}),
('splunk://db@apikey?action=warn', {
# Always Warning (short form)
'instance': NotifySplunk,
}),
('splunk://db@apikey?action=info', {
# Always INFO
'instance': NotifySplunk,
}),
('splunk://db@apikey?action=i', {
# Always INFO (short form)
'instance': NotifySplunk,
}),
('splunk://db@apikey?action=invalid', {
# Invalid Action
'instance': TypeError,
}),
('splunk://db@apikey?:warning=critical', {
# Map warnings to CRITICAL
'instance': NotifySplunk,
}),
('splunk://db@apikey?:invalid=critical', {
# A bad Apprise Notification Type was provided
'instance': TypeError,
}),
('splunk://db@apikey?:warning=invalid', {
# A bad Splunk Notification Type was provided
'instance': TypeError,
}),
('splunk://db@apikey', {
'instance': NotifySplunk,
# force a failure
'response': False,
'requests_response_code': requests.codes.internal_server_error,
}),
('splunk://db@apikey', {
'instance': NotifySplunk,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('splunk://db@token', {
'instance': NotifySplunk,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
)
def test_plugin_splunk_urls():
"""
NotifySplunk() Apprise URLs
"""
# Run our general tests
AppriseURLTester(tests=apprise_url_tests).run_all()