Opsgenie functionality ported to jira://

pull/1273/head
Chris Caron 2025-01-18 14:21:04 -05:00
parent a2a2216c46
commit 8272e3949a
6 changed files with 1247 additions and 1 deletions

View File

@ -31,6 +31,7 @@ Guilded
Home Assistant Home Assistant
httpSMS httpSMS
IFTTT IFTTT
Jira
Join Join
JSON JSON
Kavenegar Kavenegar

View File

@ -74,6 +74,7 @@ The table below identifies the services this tool supports and some example serv
| [Guilded](https://github.com/caronc/apprise/wiki/Notify_guilded) | guilded:// | (TCP) 443 | guilded://webhook_id/webhook_token<br />guilded://avatar@webhook_id/webhook_token | [Guilded](https://github.com/caronc/apprise/wiki/Notify_guilded) | guilded:// | (TCP) 443 | guilded://webhook_id/webhook_token<br />guilded://avatar@webhook_id/webhook_token
| [Home Assistant](https://github.com/caronc/apprise/wiki/Notify_homeassistant) | hassio:// or hassios:// | (TCP) 8123 or 443 | hassio://hostname/accesstoken<br />hassio://user@hostname/accesstoken<br />hassio://user:password@hostname:port/accesstoken<br />hassio://hostname/optional/path/accesstoken | [Home Assistant](https://github.com/caronc/apprise/wiki/Notify_homeassistant) | hassio:// or hassios:// | (TCP) 8123 or 443 | hassio://hostname/accesstoken<br />hassio://user@hostname/accesstoken<br />hassio://user:password@hostname:port/accesstoken<br />hassio://hostname/optional/path/accesstoken
| [IFTTT](https://github.com/caronc/apprise/wiki/Notify_ifttt) | ifttt:// | (TCP) 443 | ifttt://webhooksID/Event<br />ifttt://webhooksID/Event1/Event2/EventN<br/>ifttt://webhooksID/Event1/?+Key=Value<br/>ifttt://webhooksID/Event1/?-Key=value1 | [IFTTT](https://github.com/caronc/apprise/wiki/Notify_ifttt) | ifttt:// | (TCP) 443 | ifttt://webhooksID/Event<br />ifttt://webhooksID/Event1/Event2/EventN<br/>ifttt://webhooksID/Event1/?+Key=Value<br/>ifttt://webhooksID/Event1/?-Key=value1
| [Jira](https://github.com/caronc/apprise/wiki/Notify_jira) | jira:// | (TCP) 443 | jira://APIKey<br/>jira://APIKey/UserID<br/>jira://APIKey/#Team<br/>jira://APIKey/\*Schedule<br/>jira://APIKey/^Escalation
| [Join](https://github.com/caronc/apprise/wiki/Notify_join) | join:// | (TCP) 443 | join://apikey/device<br />join://apikey/device1/device2/deviceN/<br />join://apikey/group<br />join://apikey/groupA/groupB/groupN<br />join://apikey/DeviceA/groupA/groupN/DeviceN/ | [Join](https://github.com/caronc/apprise/wiki/Notify_join) | join:// | (TCP) 443 | join://apikey/device<br />join://apikey/device1/device2/deviceN/<br />join://apikey/group<br />join://apikey/groupA/groupB/groupN<br />join://apikey/DeviceA/groupA/groupN/DeviceN/
| [KODI](https://github.com/caronc/apprise/wiki/Notify_kodi) | kodi:// or kodis:// | (TCP) 8080 or 443 | kodi://hostname<br />kodi://user@hostname<br />kodi://user:password@hostname:port | [KODI](https://github.com/caronc/apprise/wiki/Notify_kodi) | kodi:// or kodis:// | (TCP) 8080 or 443 | kodi://hostname<br />kodi://user@hostname<br />kodi://user:password@hostname:port
| [Kumulos](https://github.com/caronc/apprise/wiki/Notify_kumulos) | kumulos:// | (TCP) 443 | kumulos://apikey/serverkey | [Kumulos](https://github.com/caronc/apprise/wiki/Notify_kumulos) | kumulos:// | (TCP) 443 | kumulos://apikey/serverkey

850
apprise/plugins/jira.py Normal file
View File

@ -0,0 +1,850 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# Knowing this, you can build your Jira URL as follows:
# jira://{apikey}/
# jira://{apikey}/@{user}
# jira://{apikey}/*{schedule}
# jira://{apikey}/^{escalation}
# jira://{apikey}/#{team}
#
# You can mix and match what you want to notify freely
# jira://{apikey}/@{user}/#{team}/*{schedule}/^{escalation}
#
# If no target prefix is specified, then it is assumed to be a user.
#
# API Documentation: \
# https://developer.atlassian.com/cloud/jira/ \
# service-desk-ops/rest/v1/api-group-integration-events/
import requests
from json import dumps, loads
import hashlib
from .base import NotifyBase
from ..common import NotifyType, NOTIFY_TYPES
from ..common import PersistentStoreMode
from ..utils.parse import validate_regex, is_uuid, parse_list, parse_bool
from ..locale import gettext_lazy as _
class JiraCategory(NotifyBase):
"""
We define the different category types that we can notify
"""
USER = 'user'
SCHEDULE = 'schedule'
ESCALATION = 'escalation'
TEAM = 'team'
JIRA_CATEGORIES = (
JiraCategory.USER,
JiraCategory.SCHEDULE,
JiraCategory.ESCALATION,
JiraCategory.TEAM,
)
class JiraAlertAction:
"""
Defines the supported actions
"""
# Use mapping (specify :key=arg to over-ride)
MAP = 'map'
# Create new alert (default)
NEW = 'new'
# Close Alert
CLOSE = 'close'
# Delete Alert
DELETE = 'delete'
# Acknowledge Alert
ACKNOWLEDGE = 'acknowledge'
# Add note to alert
NOTE = 'note'
JIRA_ACTIONS = (
JiraAlertAction.MAP,
JiraAlertAction.NEW,
JiraAlertAction.CLOSE,
JiraAlertAction.DELETE,
JiraAlertAction.ACKNOWLEDGE,
JiraAlertAction.NOTE,
)
# Map all support Apprise Categories to Jira Categories
JIRA_ALERT_MAP = {
NotifyType.INFO: JiraAlertAction.CLOSE,
NotifyType.SUCCESS: JiraAlertAction.CLOSE,
NotifyType.WARNING: JiraAlertAction.NEW,
NotifyType.FAILURE: JiraAlertAction.NEW,
}
# Regions
class JiraRegion:
US = 'us'
EU = 'eu'
# Jira APIs (OpsGenie port - so keep us/en support for easy transition)
JIRA_API_LOOKUP = {
JiraRegion.US: 'https://api.atlassian.com/jsm/ops/integration/v2/alerts',
JiraRegion.EU: 'https://api.atlassian.com/jsm/ops/integration/v2/alerts',
}
# A List of our regions we can use for verification
JIRA_REGIONS = (
JiraRegion.US,
JiraRegion.EU,
)
# Priorities
class JiraPriority:
LOW = 1
MODERATE = 2
NORMAL = 3
HIGH = 4
EMERGENCY = 5
JIRA_PRIORITIES = {
# Note: This also acts as a reverse lookup mapping
JiraPriority.LOW: 'low',
JiraPriority.MODERATE: 'moderate',
JiraPriority.NORMAL: 'normal',
JiraPriority.HIGH: 'high',
JiraPriority.EMERGENCY: 'emergency',
}
JIRA_PRIORITY_MAP = {
# Maps against string 'low'
'l': JiraPriority.LOW,
# Maps against string 'moderate'
'm': JiraPriority.MODERATE,
# Maps against string 'normal'
'n': JiraPriority.NORMAL,
# Maps against string 'high'
'h': JiraPriority.HIGH,
# Maps against string 'emergency'
'e': JiraPriority.EMERGENCY,
# Entries to additionally support (so more like Jira's API)
'1': JiraPriority.LOW,
'2': JiraPriority.MODERATE,
'3': JiraPriority.NORMAL,
'4': JiraPriority.HIGH,
'5': JiraPriority.EMERGENCY,
# Support p-prefix
'p1': JiraPriority.LOW,
'p2': JiraPriority.MODERATE,
'p3': JiraPriority.NORMAL,
'p4': JiraPriority.HIGH,
'p5': JiraPriority.EMERGENCY,
}
class NotifyJira(NotifyBase):
"""
A wrapper for Jira Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Jira'
# The services URL
service_url = 'https://atlassian.com/'
# All notification requests are secure
secure_protocol = 'jira'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_jira'
# The maximum length of the body
body_maxlen = 15000
# Our default is to no not use persistent storage beyond in-memory
# reference
storage_mode = PersistentStoreMode.AUTO
# If we don't have the specified min length, then we don't bother using
# the body directive
jira_body_minlen = 130
# The default region to use if one isn't otherwise specified
jira_default_region = JiraRegion.US
# The maximum allowable targets within a notification
default_batch_size = 50
# Defines our default message mapping
jira_message_map = {
# Add a note to existing alert
NotifyType.INFO: JiraAlertAction.NOTE,
# Close existing alert
NotifyType.SUCCESS: JiraAlertAction.CLOSE,
# Create notice
NotifyType.WARNING: JiraAlertAction.NEW,
# Create notice
NotifyType.FAILURE: JiraAlertAction.NEW,
}
# Define object templates
templates = (
'{schema}://{apikey}',
'{schema}://{user}@{apikey}',
'{schema}://{apikey}/{targets}',
'{schema}://{user}@{apikey}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'apikey': {
'name': _('API Key'),
'type': 'string',
'private': True,
'required': True,
},
'user': {
'name': _('Username'),
'type': 'string',
},
'target_escalation': {
'name': _('Target Escalation'),
'prefix': '^',
'type': 'string',
'map_to': 'targets',
},
'target_schedule': {
'name': _('Target Schedule'),
'type': 'string',
'prefix': '*',
'map_to': 'targets',
},
'target_user': {
'name': _('Target User'),
'type': 'string',
'prefix': '@',
'map_to': 'targets',
},
'target_team': {
'name': _('Target Team'),
'type': 'string',
'prefix': '#',
'map_to': 'targets',
},
'targets': {
'name': _('Targets '),
'type': 'list:string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'region': {
'name': _('Region Name'),
'type': 'choice:string',
'values': JIRA_REGIONS,
'default': JiraRegion.US,
'map_to': 'region_name',
},
'batch': {
'name': _('Batch Mode'),
'type': 'bool',
'default': False,
},
'priority': {
'name': _('Priority'),
'type': 'choice:int',
'values': JIRA_PRIORITIES,
'default': JiraPriority.NORMAL,
},
'entity': {
'name': _('Entity'),
'type': 'string',
},
'alias': {
'name': _('Alias'),
'type': 'string',
},
'tags': {
'name': _('Tags'),
'type': 'string',
},
'to': {
'alias_of': 'targets',
},
'action': {
'name': _('Action'),
'type': 'choice:string',
'values': JIRA_ACTIONS,
'default': JIRA_ACTIONS[0],
}
})
# Map of key-value pairs to use as custom properties of the alert.
template_kwargs = {
'details': {
'name': _('Details'),
'prefix': '+',
},
'mapping': {
'name': _('Action Mapping'),
'prefix': ':',
},
}
def __init__(self, apikey, targets, region_name=None, details=None,
priority=None, alias=None, entity=None, batch=False,
tags=None, action=None, mapping=None, **kwargs):
"""
Initialize Jira Object
"""
super().__init__(**kwargs)
# API Key (associated with project)
self.apikey = validate_regex(apikey)
if not self.apikey:
msg = 'An invalid Jira API Key ' \
'({}) was specified.'.format(apikey)
self.logger.warning(msg)
raise TypeError(msg)
# The Priority of the message
self.priority = NotifyJira.template_args['priority']['default'] \
if not priority else \
next((
v for k, v in JIRA_PRIORITY_MAP.items()
if str(priority).lower().startswith(k)),
NotifyJira.template_args['priority']['default'])
# Store our region
try:
self.region_name = self.jira_default_region \
if region_name is None else region_name.lower()
if self.region_name not in JIRA_REGIONS:
# allow the outer except to handle this common response
raise
except:
# Invalid region specified
msg = 'The Jira region specified ({}) is invalid.' \
.format(region_name)
self.logger.warning(msg)
raise TypeError(msg)
if action and isinstance(action, str):
self.action = next(
(a for a in JIRA_ACTIONS if a.startswith(action)), None)
if self.action not in JIRA_ACTIONS:
msg = 'The Jira action specified ({}) is invalid.'\
.format(action)
self.logger.warning(msg)
raise TypeError(msg)
else:
self.action = self.template_args['action']['default']
# Store our mappings
self.mapping = self.jira_message_map.copy()
if mapping and isinstance(mapping, dict):
for _k, _v in mapping.items():
# Get our mapping
k = next((t for t in NOTIFY_TYPES if t.startswith(_k)), None)
if not k:
msg = 'The Jira mapping key specified ({}) ' \
'is invalid.'.format(_k)
self.logger.warning(msg)
raise TypeError(msg)
_v_lower = _v.lower()
v = next((v for v in JIRA_ACTIONS[1:]
if v.startswith(_v_lower)), None)
if not v:
msg = 'The Jira mapping value (assigned to {}) ' \
'specified ({}) is invalid.'.format(k, _v)
self.logger.warning(msg)
raise TypeError(msg)
# Update our mapping
self.mapping[k] = v
self.details = {}
if details:
# Store our extra details
self.details.update(details)
# Prepare Batch Mode Flag
self.batch_size = self.default_batch_size if batch else 1
# Assign our tags (if defined)
self.__tags = parse_list(tags)
# Assign our entity (if defined)
self.entity = entity
# Assign our alias (if defined)
self.alias = alias
# Initialize our Targets
self.targets = []
# Sort our targets
for _target in parse_list(targets):
target = _target.strip()
if len(target) < 2:
self.logger.debug('Ignoring Jira Entry: %s' % target)
continue
if target.startswith(NotifyJira.template_tokens
['target_team']['prefix']):
self.targets.append(
{'type': JiraCategory.TEAM, 'id': target[1:]}
if is_uuid(target[1:]) else
{'type': JiraCategory.TEAM, 'name': target[1:]})
elif target.startswith(NotifyJira.template_tokens
['target_schedule']['prefix']):
self.targets.append(
{'type': JiraCategory.SCHEDULE, 'id': target[1:]}
if is_uuid(target[1:]) else
{'type': JiraCategory.SCHEDULE, 'name': target[1:]})
elif target.startswith(NotifyJira.template_tokens
['target_escalation']['prefix']):
self.targets.append(
{'type': JiraCategory.ESCALATION, 'id': target[1:]}
if is_uuid(target[1:]) else
{'type': JiraCategory.ESCALATION, 'name': target[1:]})
elif target.startswith(NotifyJira.template_tokens
['target_user']['prefix']):
self.targets.append(
{'type': JiraCategory.USER, 'id': target[1:]}
if is_uuid(target[1:]) else
{'type': JiraCategory.USER, 'username': target[1:]})
else:
# Ambiguious entry; treat it as a user but not before
# displaying a warning to the end user first:
self.logger.debug(
'Treating ambigious Jira target %s as a user', target)
self.targets.append(
{'type': JiraCategory.USER, 'id': target}
if is_uuid(target) else
{'type': JiraCategory.USER, 'username': target})
def _fetch(self, method, url, payload, params=None):
"""
Performs server retrieval/update and returns JSON Response
"""
headers = {
'User-Agent': self.app_id,
'Accepts': 'application/json',
'Content-Type': 'application/json',
'Authorization': 'GenieKey {}'.format(self.apikey),
}
# Some Debug Logging
self.logger.debug(
'Jira POST URL: {} (cert_verify={})'.format(
url, self.verify_certificate))
self.logger.debug('Jira Payload: {}' .format(payload))
# Initialize our response object
content = {}
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = method(
url,
data=dumps(payload),
params=params,
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
# A Response might look like:
# {
# "result": "Request will be processed",
# "took": 0.302,
# "requestId": "43a29c5c-3dbf-4fa4-9c26-f4f71023e120"
# }
try:
# Update our response object
content = loads(r.content)
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
# AttributeError = r is None
content = {}
if r.status_code not in (
requests.codes.accepted, requests.codes.ok):
status_str = \
NotifyBase.http_response_code_lookup(
r.status_code)
self.logger.warning(
'Failed to send Jira notification:'
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return (False, content.get('requestId'))
# If we reach here; the message was sent
self.logger.info('Sent Jira notification')
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return (True, content.get('requestId'))
except requests.RequestException as e:
self.logger.warning(
'A Connection error occurred sending Jira '
'notification.')
self.logger.debug('Socket Exception: %s' % str(e))
return (False, content.get('requestId'))
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Jira Notification
"""
# Get our Jira Action
action = JIRA_ALERT_MAP[notify_type] \
if self.action == JiraAlertAction.MAP else self.action
# Prepare our URL as it's based on our hostname
notify_url = JIRA_API_LOOKUP[self.region_name]
# Initialize our has_error flag
has_error = False
# Default method is to post
method = requests.post
# For indexing in persistent store
key = hashlib.sha1(
(self.entity if self.entity else (
self.alias if self.alias else (
title if title else self.app_id)))
.encode('utf-8')).hexdigest()[0:10]
# Get our Jira Request IDs
request_ids = self.store.get(key, [])
if not isinstance(request_ids, list):
request_ids = []
if action == JiraAlertAction.NEW:
# Create a copy ouf our details object
details = self.details.copy()
if 'type' not in details:
details['type'] = notify_type
# Use body if title not set
title_body = body if not title else title
# Prepare our payload
payload = {
'source': self.app_desc,
'message': title_body,
'description': body,
'details': details,
'priority': 'P{}'.format(self.priority),
}
# Use our body directive if we exceed the minimum message
# limitation
if len(payload['message']) > self.jira_body_minlen:
payload['message'] = '{}...'.format(
title_body[:self.jira_body_minlen - 3])
if self.__tags:
payload['tags'] = self.__tags
if self.entity:
payload['entity'] = self.entity
if self.alias:
payload['alias'] = self.alias
if self.user:
payload['user'] = self.user
# reset our request IDs - we will re-populate them
request_ids = []
length = len(self.targets) if self.targets else 1
for index in range(0, length, self.batch_size):
if self.targets:
# If there were no targets identified, then we simply
# just iterate once without the responders set
payload['responders'] = \
self.targets[index:index + self.batch_size]
# Perform our post
success, request_id = self._fetch(
method, notify_url, payload)
if success and request_id:
# Save our response
request_ids.append(request_id)
else:
has_error = True
# Store our entries for a maximum of 60 days
self.store.set(key, request_ids, expires=60 * 60 * 24 * 60)
elif request_ids:
# Prepare our payload
payload = {
'source': self.app_desc,
'note': body,
}
if self.user:
payload['user'] = self.user
# Prepare our Identifier type
params = {
'identifierType': 'id',
}
for request_id in request_ids:
if action == JiraAlertAction.DELETE:
# Update our URL
url = f'{notify_url}/{request_id}'
method = requests.delete
elif action == JiraAlertAction.ACKNOWLEDGE:
url = f'{notify_url}/{request_id}/acknowledge'
elif action == JiraAlertAction.CLOSE:
url = f'{notify_url}/{request_id}/close'
else: # action == JiraAlertAction.CLOSE:
url = f'{notify_url}/{request_id}/notes'
# Perform our post
success, _ = self._fetch(method, url, payload, params)
if not success:
has_error = True
if not has_error and action == JiraAlertAction.DELETE:
# Remove cached entry
self.store.clear(key)
else:
self.logger.info(
'No Jira notification sent due to (nothing to %s) '
'condition', self.action)
return not has_error
@property
def url_identifier(self):
"""
Returns all of the identifiers that make this URL unique from
another simliar one. Targets or end points should never be identified
here.
"""
return (self.secure_protocol, self.region_name, self.apikey)
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any URL parameters
params = {
'action': self.action,
'region': self.region_name,
'priority':
JIRA_PRIORITIES[self.template_args['priority']['default']]
if self.priority not in JIRA_PRIORITIES
else JIRA_PRIORITIES[self.priority],
'batch': 'yes' if self.batch_size > 1 else 'no',
}
# Assign our entity value (if defined)
if self.entity:
params['entity'] = self.entity
# Assign our alias value (if defined)
if self.alias:
params['alias'] = self.alias
# Assign our tags (if specifed)
if self.__tags:
params['tags'] = ','.join(self.__tags)
# Append our details into our parameters
params.update({'+{}'.format(k): v for k, v in self.details.items()})
# Append our assignment extra's into our parameters
params.update(
{':{}'.format(k): v for k, v in self.mapping.items()})
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
# A map allows us to map our target types so they can be correctly
# placed back into your URL below. Hence map the 'user' -> '@'
__map = {
JiraCategory.USER:
NotifyJira.template_tokens['target_user']['prefix'],
JiraCategory.SCHEDULE:
NotifyJira.template_tokens['target_schedule']['prefix'],
JiraCategory.ESCALATION:
NotifyJira.template_tokens['target_escalation']['prefix'],
JiraCategory.TEAM:
NotifyJira.template_tokens['target_team']['prefix'],
}
return '{schema}://{user}{apikey}/{targets}/?{params}'.format(
schema=self.secure_protocol,
user='{}@'.format(self.user) if self.user else '',
apikey=self.pprint(self.apikey, privacy, safe=''),
targets='/'.join(
[NotifyJira.quote('{}{}'.format(
__map[x['type']],
x.get('id', x.get('name', x.get('username')))))
for x in self.targets]),
params=NotifyJira.urlencode(params))
def __len__(self):
"""
Returns the number of targets associated with this notification
"""
#
# Factor batch into calculation
#
targets = len(self.targets)
if self.batch_size > 1:
targets = int(targets / self.batch_size) + \
(1 if targets % self.batch_size else 0)
return targets if targets > 0 else 1
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# The API Key is stored in the hostname
results['apikey'] = NotifyJira.unquote(results['host'])
# Get our Targets
results['targets'] = NotifyJira.split_path(results['fullpath'])
# Add our Meta Detail keys
results['details'] = {NotifyBase.unquote(x): NotifyBase.unquote(y)
for x, y in results['qsd+'].items()}
# Set our priority
if 'priority' in results['qsd'] and len(results['qsd']['priority']):
results['priority'] = \
NotifyJira.unquote(results['qsd']['priority'])
# Get Batch Boolean (if set)
results['batch'] = \
parse_bool(
results['qsd'].get(
'batch',
NotifyJira.template_args['batch']['default']))
if 'apikey' in results['qsd'] and len(results['qsd']['apikey']):
results['apikey'] = \
NotifyJira.unquote(results['qsd']['apikey'])
if 'tags' in results['qsd'] and len(results['qsd']['tags']):
# Extract our tags
results['tags'] = \
parse_list(NotifyJira.unquote(results['qsd']['tags']))
if 'region' in results['qsd'] and len(results['qsd']['region']):
# Extract our region
results['region_name'] = \
NotifyJira.unquote(results['qsd']['region'])
if 'entity' in results['qsd'] and len(results['qsd']['entity']):
# Extract optional entity field
results['entity'] = \
NotifyJira.unquote(results['qsd']['entity'])
if 'alias' in results['qsd'] and len(results['qsd']['alias']):
# Extract optional alias field
results['alias'] = \
NotifyJira.unquote(results['qsd']['alias'])
# Handle 'to' email address
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'].append(results['qsd']['to'])
# Store our action (if defined)
if 'action' in results['qsd'] and len(results['qsd']['action']):
results['action'] = \
NotifyJira.unquote(results['qsd']['action'])
# store any custom mapping defined
results['mapping'] = \
{NotifyJira.unquote(x): NotifyJira.unquote(y)
for x, y in results['qsd:'].items()}
return results

View File

@ -339,6 +339,11 @@ class NotifyOpsgenie(NotifyBase):
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
# Notify users that this plugin will require them to switch soon
self.logger.deprecate(
'Opsgenie will soon be depricated and moved to Jira; '
'visit https://atlassian.com/ for more details')
# API Key (associated with project) # API Key (associated with project)
self.apikey = validate_regex(apikey) self.apikey = validate_regex(apikey)
if not self.apikey: if not self.apikey:

View File

@ -42,7 +42,7 @@ it easy to access:
Africas Talking, Apprise API, APRS, AWS SES, AWS SNS, Bark, Burst SMS, Africas Talking, Apprise API, APRS, AWS SES, AWS SNS, Bark, Burst SMS,
BulkSMS, BulkVS, Chanify, ClickSend, DAPNET, DingTalk, Discord, E-Mail, Emby, BulkSMS, BulkVS, Chanify, ClickSend, DAPNET, DingTalk, Discord, E-Mail, Emby,
FCM, Feishu, Flock, Free Mobile, Google Chat, Gotify, Growl, Guilded, Home FCM, Feishu, Flock, Free Mobile, Google Chat, Gotify, Growl, Guilded, Home
Assistant, httpSMS, IFTTT, Join, Kavenegar, KODI, Kumulos, LaMetric, Line, Assistant, httpSMS, IFTTT, Jira, Join, Kavenegar, KODI, Kumulos, LaMetric, Line,
LunaSea, MacOSX, Mailgun, Mastodon, Mattermost,Matrix, MessageBird, Microsoft LunaSea, MacOSX, Mailgun, Mastodon, Mattermost,Matrix, MessageBird, Microsoft
Windows, Microsoft Teams, Misskey, MQTT, MSG91, MyAndroid, Nexmo, Nextcloud, Windows, Microsoft Teams, Misskey, MQTT, MSG91, MyAndroid, Nexmo, Nextcloud,
NextcloudTalk, Notica, Notifiarr, Notifico, ntfy, Office365, OneSignal, NextcloudTalk, Notica, Notifiarr, Notifico, ntfy, Office365, OneSignal,

389
test/test_plugin_jira.py Normal file
View File

@ -0,0 +1,389 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from unittest import mock
from json import dumps
import requests
import apprise
from apprise.plugins.jira import (
NotifyType, NotifyJira, JiraPriority)
from helpers import AppriseURLTester
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
# a test UUID we can use
UUID4 = '8b799edf-6f98-4d3a-9be7-2862fb4e5752'
JIRA_GOOD_RESPONSE = dumps({
"result": "Request will be processed",
"took": 0.204,
"requestId": "43a29c5c-3dbf-4fa4-9c26-f4f71023e120"
})
# Our Testing URLs
apprise_url_tests = (
('jira://', {
# We failed to identify any valid authentication
'instance': TypeError,
}),
('jira://:@/', {
# We failed to identify any valid authentication
'instance': TypeError,
}),
('jira://%20%20/', {
# invalid apikey specified
'instance': TypeError,
}),
('jira://apikey/user/?region=xx', {
# invalid region id
'instance': TypeError,
}),
('jira://user@apikey/', {
# No targets specified; this is allowed
'instance': NotifyJira,
'notify_type': NotifyType.WARNING,
# Bad response returned
'requests_response_text': '{',
# We will not be successful sending the notice
'notify_response': False,
}),
('jira://apikey/', {
# No targets specified; this is allowed
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://apikey/user', {
# Valid user
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
'privacy_url': 'jira://a...y/%40user',
}),
('jira://apikey/@user?region=eu', {
# European Region
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://apikey/@user?entity=A%20Entity', {
# Assign an entity
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://apikey/@user?alias=An%20Alias', {
# Assign an alias
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
# Bad Action
('jira://apikey/@user?action=invalid', {
# Assign an entity
'instance': TypeError,
}),
('jira://from@apikey/@user?:invalid=note', {
# Assign an entity
'instance': TypeError,
}),
('jira://apikey/@user?:warning=invalid', {
# Assign an entity
'instance': TypeError,
}),
# Creates an index entry
('jira://apikey/@user?entity=index&action=new', {
# Assign an entity
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
# Now action it
('jira://apikey/@user?entity=index&action=acknowledge', {
# Assign an entity
'instance': NotifyJira,
'notify_type': NotifyType.SUCCESS,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://from@apikey/@user?entity=index&action=note', {
# Assign an entity
'instance': NotifyJira,
'notify_type': NotifyType.SUCCESS,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://from@apikey/@user?entity=index&action=note', {
# Assign an entity
'instance': NotifyJira,
'notify_type': NotifyType.SUCCESS,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
'response': False,
'requests_response_code': 500,
}),
('jira://apikey/@user?entity=index&action=close', {
# Assign an entity
'instance': NotifyJira,
'notify_type': NotifyType.SUCCESS,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://apikey/@user?entity=index&action=delete', {
# Assign an entity
'instance': NotifyJira,
'notify_type': NotifyType.SUCCESS,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
# map info messages to generate a new message
('jira://apikey/@user?entity=index2&:info=new', {
# Assign an entity
'instance': NotifyJira,
'notify_type': NotifyType.INFO,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://joe@apikey/@user?priority=p3', {
# Assign our priority
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://apikey/?tags=comma,separated', {
# Test our our 'tags' (tag is reserved in Apprise) but not 'tags'
# Also test the fact we do not need to define a target
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://apikey/@user?priority=invalid', {
# Invalid priority (loads using default)
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://apikey/user@email.com/#team/*sche/^esc/%20/a', {
# Valid user (email), valid schedule, Escalated ID,
# an invalid entry (%20), and too short of an entry (a)
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://apikey/@{}/#{}/*{}/^{}/'.format(
UUID4, UUID4, UUID4, UUID4), {
# similar to the above, except we use the UUID's
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
# Same link as before but @ missing at the front causing an ambigious
# lookup however the entry is treated a though a @ was in front (user)
('jira://apikey/{}/#{}/*{}/^{}/'.format(
UUID4, UUID4, UUID4, UUID4), {
# similar to the above, except we use the UUID's
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://apikey?to=#team,user&+key=value&+type=override', {
# Test to= and details (key/value pair) also override 'type'
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://apikey/#team/@user/?batch=yes', {
# Test batch=
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://apikey/#team/@user/?batch=no', {
# Test batch=
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://?apikey=abc&to=user', {
# Test Kwargs
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
}),
('jira://apikey/#team/user/', {
'instance': NotifyJira,
# throw a bizzare code forcing us to fail to look it up
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
'response': False,
'requests_response_code': 999,
}),
('jira://apikey/#topic1/device/', {
'instance': NotifyJira,
'notify_type': NotifyType.FAILURE,
# Our response expected server response
'requests_response_text': JIRA_GOOD_RESPONSE,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
)
def test_plugin_jira_urls(tmpdir):
"""
NotifyJira() Apprise URLs
"""
# Run our general tests
AppriseURLTester(tests=apprise_url_tests).run_all(str(tmpdir))
@mock.patch('requests.post')
def test_plugin_jira_config_files(mock_post):
"""
NotifyJira() Config File Cases
"""
content = """
urls:
- jira://apikey/user:
- priority: 1
tag: jira_int low
- priority: "1"
tag: jira_str_int low
- priority: "p1"
tag: jira_pstr_int low
- priority: low
tag: jira_str low
# This will take on moderate (default) priority
- priority: invalid
tag: jira_invalid
- jira://apikey2/user2:
- priority: 5
tag: jira_int emerg
- priority: "5"
tag: jira_str_int emerg
- priority: "p5"
tag: jira_pstr_int emerg
- priority: emergency
tag: jira_str emerg
"""
# Prepare Mock
mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok
mock_post.return_value.content = JIRA_GOOD_RESPONSE
# Create ourselves a config object
ac = apprise.AppriseConfig()
assert ac.add_config(content=content) is True
aobj = apprise.Apprise()
# Add our configuration
aobj.add(ac)
# We should be able to read our 9 servers from that
# 4x low
# 4x emerg
# 1x invalid (so takes on normal priority)
assert len(ac.servers()) == 9
assert len(aobj) == 9
assert len([x for x in aobj.find(tag='low')]) == 4
for s in aobj.find(tag='low'):
assert s.priority == JiraPriority.LOW
assert len([x for x in aobj.find(tag='emerg')]) == 4
for s in aobj.find(tag='emerg'):
assert s.priority == JiraPriority.EMERGENCY
assert len([x for x in aobj.find(tag='jira_str')]) == 2
assert len([x for x in aobj.find(tag='jira_str_int')]) == 2
assert len([x for x in aobj.find(tag='jira_pstr_int')]) == 2
assert len([x for x in aobj.find(tag='jira_int')]) == 2
assert len([x for x in aobj.find(tag='jira_invalid')]) == 1
assert next(aobj.find(tag='jira_invalid')).priority == \
JiraPriority.NORMAL
@mock.patch('requests.post')
def test_plugin_jira_edge_case(mock_post):
"""
NotifyJira() Edge Cases
"""
# Prepare Mock
mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok
mock_post.return_value.content = JIRA_GOOD_RESPONSE
instance = apprise.Apprise.instantiate('jira://apikey')
assert isinstance(instance, NotifyJira)
assert len(instance.store.keys()) == 0
assert instance.notify('test', 'key', NotifyType.FAILURE) is True
assert len(instance.store.keys()) == 1
# Again just causes same index to get over-written
assert instance.notify('test', 'key', NotifyType.FAILURE) is True
assert len(instance.store.keys()) == 1
assert 'a62f2225bf' in instance.store
# Assign it garbage
instance.store['a62f2225bf'] = 'garbage'
# This causes an internal check to fail where the keys are expected to be
# as a list (this one is now a string)
# content self corrects and things are fine
assert instance.notify('test', 'key', NotifyType.FAILURE) is True
assert len(instance.store.keys()) == 1
# new key is new index
assert instance.notify('test', 'key2', NotifyType.FAILURE) is True
assert len(instance.store.keys()) == 2