v8.2.0: Add centralized error handling and structured logging modules

Introduced a unified error handling module (`error_handler.py`) for consistent exception management across the app. Added a structured logging system (`logger.py`) to support JSON-formatted logs, improving log readability and analysis. Updated various modules to utilize these new components for better maintainability and tracing.
master
Aidaho 2025-05-19 09:24:35 +03:00
parent 0d022c57e5
commit 8c300a9f7d
10 changed files with 598 additions and 142 deletions

View File

@ -4,12 +4,22 @@ from flask_jwt_extended import JWTManager
from flask_apscheduler import APScheduler
from app.modules.common.common import set_correct_owner
from app.modules.roxywi import logger
app = Flask(__name__)
app.config.from_object('app.config.Configuration')
app.jinja_env.add_extension('jinja2.ext.do')
app.jinja_env.add_extension('jinja2.ext.loopcontrols')
# Initialize logger
logger.setup_logger(
log_path=app.config.get('LOG_PATH', '/var/log/roxy-wi'),
log_file=app.config.get('LOG_FILE', 'roxy-wi.log'),
log_level=app.config.get('LOG_LEVEL', logger.INFO),
console_logging=app.config.get('LOG_CONSOLE', False)
)
logger.info("Roxy-WI application starting up")
cache = Cache()
cache.init_app(app)
@ -75,3 +85,7 @@ app.register_blueprint(udp_bp)
from app import login
from app import jobs
# Register error handlers
from app.modules.roxywi.error_handler import register_error_handlers
register_error_handlers(app)

View File

@ -1,4 +1,9 @@
from datetime import timedelta
import logging
import app.modules.roxy_wi_tools as roxy_wi_tools
get_config = roxy_wi_tools.GetConfigVar()
class Configuration(object):
@ -14,3 +19,9 @@ class Configuration(object):
JWT_IDENTITY_CLAIM = 'user_id'
JWT_ERROR_MESSAGE_KEY = 'error'
FLASK_PYDANTIC_VALIDATION_ERROR_RAISE = True
# Logging configuration
LOG_PATH = get_config.get_config_var('main', 'log_path')
LOG_FILE = 'roxy-wi.log'
LOG_LEVEL = logging.INFO
LOG_CONSOLE = False # Set to True to also log to console

View File

@ -6,6 +6,7 @@ import app.modules.db.user as user_sql
import app.modules.roxywi.roxy as roxy
import app.modules.roxywi.auth as roxywi_auth
import app.modules.roxywi.common as roxywi_common
from app.modules.roxywi import logger
@app.before_request
@ -56,10 +57,12 @@ def login_page():
return roxywi_common.handle_json_exceptions(e, 'Cannot check login password'), 401
try:
response = roxywi_auth.do_login(user_params, next_url)
logger.info(f'{login} login')
except Exception as e:
return roxywi_common.handle_json_exceptions(e, 'Cannot do login'), 401
return response
return redirect('/', 302)
@app.route('/logout', methods=['GET', 'POST'])

View File

@ -1,6 +1,5 @@
import os
import glob
import logging as logger
from typing import Any, Union
from flask import request, g
@ -8,16 +7,17 @@ from flask_jwt_extended import get_jwt
from flask_jwt_extended import verify_jwt_in_request
import app.modules.db.udp as udp_sql
import app.modules.db.ha_cluster as ha_sql
import app.modules.db.roxy as roxy_sql
import app.modules.db.user as user_sql
import app.modules.db.group as group_sql
import app.modules.db.server as server_sql
import app.modules.db.history as history_sql
import app.modules.db.ha_cluster as ha_sql
import app.modules.roxy_wi_tools as roxy_wi_tools
from app.modules.roxywi.class_models import ErrorResponse
from app.modules.roxywi.exception import RoxywiResourceNotFound, RoxywiGroupMismatch, RoxywiGroupNotFound, \
RoxywiPermissionError, RoxywiConflictError
from app.modules.roxywi.exception import RoxywiGroupMismatch
from app.modules.roxywi.error_handler import handle_exception, log_error
from app.modules.roxywi import logger
get_config_var = roxy_wi_tools.GetConfigVar()
@ -104,49 +104,69 @@ def get_files(folder, file_format, server_ip=None) -> list:
def logging(server_ip: Union[str, int], action: str, **kwargs) -> None:
def setup_logger(log_file: str) -> None:
"""Helper function to set up the logger configuration."""
logger.basicConfig(
filename=log_file,
format='%(asctime)s %(levelname)s: %(message)s',
level=logger.INFO,
datefmt='%b %d %H:%M:%S'
"""
Log an action with detailed information.
Args:
server_ip: The IP of the server where the action occurred
action: The action that was performed
**kwargs: Additional arguments, including:
keep_history: Whether to keep the action in the history
service: The service where the action occurred
"""
try:
# JWT validation and extracting user's information
claims = get_jwt_token_claims()
user_id = claims['user_id']
user = user_sql.get_user_id(user_id=user_id)
user_group = get_user_group()
ip = request.remote_addr
# Determine log level and clean up action string
if 'error' in action:
log_level = logger.ERROR
action = action.replace('error: : ', '')
action = action.replace('error: ', '')
elif 'warning' in action:
log_level = logger.WARNING
action = action.replace('warning: ', '')
else:
log_level = logger.INFO
# Log the message with structured context
logger.log(
log_level,
action,
server_ip=server_ip,
user_id=user.user_id,
username=user.username,
user_group=user_group,
client_ip=ip,
service=kwargs.get('service')
)
logger.getLogger("paramiko").setLevel(logger.WARNING)
# Extracted log path and file configuration
log_path = get_config_var.get_config_var('main', 'log_path')
log_file = f"{log_path}/roxy-wi.log"
setup_logger(log_file)
# JWT validation and extracting user's information
claims = get_jwt_token_claims()
user_id = claims['user_id']
user = user_sql.get_user_id(user_id=user_id)
user_group = get_user_group()
ip = request.remote_addr
if 'error' in action:
log_level = logger.error
action = action.replace('error: : ', '')
action = action.replace('error: ', '')
elif 'warning' in action:
log_level = logger.warning
action = action.replace('warning: ', '')
else:
log_level = logger.info
log_message = f"from {ip} user: {user.username}, group: {user_group}, message: {action} on: {server_ip}"
log_level(log_message)
if kwargs.get('keep_history'):
try:
keep_action_history(kwargs.get('service'), action, server_ip, user.user_id, ip)
except Exception as e:
logger.error(f'Cannot save history: {e}')
# Keep action history if requested
if kwargs.get('keep_history'):
try:
keep_action_history(kwargs.get('service'), action, server_ip, user.user_id, ip)
except Exception as e:
logger.error(f'Cannot save history: {e}', server_ip=server_ip)
except Exception as e:
# Fallback logging if we can't get user information
logger.error(f'Error in logging function: {e}', server_ip=server_ip)
def keep_action_history(service: str, action: str, server_ip: str, user_id: int, user_ip: str):
"""
Keep a history of actions in the database.
Args:
service: The service where the action occurred
action: The action that was performed
server_ip: The IP of the server where the action occurred
user_id: The ID of the user who performed the action
user_ip: The IP of the user who performed the action
"""
if user_ip == '':
user_ip = 'localhost'
@ -155,7 +175,11 @@ def keep_action_history(service: str, action: str, server_ip: str, user_id: int,
server_id = server_ip
hostname = ha_sql.select_cluster_name(int(server_id))
except Exception as e:
logging('Roxy-WI server', f'error: cannot get info about cluster {server_ip} for history: {e}')
logger.error(
f'Cannot get info about cluster {server_ip} for history',
server_ip='Roxy-WI server',
exception=e
)
return
elif service == 'UDP listener':
try:
@ -163,7 +187,11 @@ def keep_action_history(service: str, action: str, server_ip: str, user_id: int,
listener = udp_sql.get_listener(server_id)
hostname = listener.name
except Exception as e:
logging('Roxy-WI server', f'error: cannot get info about Listener {server_ip} for history: {e}')
logger.error(
f'Cannot get info about Listener {server_ip} for history',
server_ip='Roxy-WI server',
exception=e
)
return
else:
try:
@ -171,18 +199,37 @@ def keep_action_history(service: str, action: str, server_ip: str, user_id: int,
server_id = server.server_id
hostname = server.hostname
except Exception as e:
logging('Roxy-WI server', f'error: cannot get info about {server_ip} for history: {e}')
logger.error(
f'Cannot get info about {server_ip} for history',
server_ip='Roxy-WI server',
exception=e
)
return
try:
history_sql.insert_action_history(service, action, server_id, user_id, user_ip, server_ip, hostname)
except Exception as e:
logging('Roxy-WI server', f'error: cannot save a history: {e}')
logger.error(
f'Cannot save a history',
server_ip='Roxy-WI server',
exception=e,
service=service,
action=action
)
def get_dick_permit(**kwargs):
group_id = get_user_group(id=1)
return server_sql.get_dick_permit(group_id, **kwargs)
if check_user_group_for_flask():
try:
servers = server_sql.get_dick_permit(group_id, **kwargs)
except Exception as e:
raise Exception(e)
else:
return servers
else:
print('Atata!')
return []
def get_users_params(**kwargs):
@ -276,9 +323,8 @@ def handle_exceptions(ex: Exception, server_ip: str, message: str, **kwargs: Any
:param message: The error message to be logged and raised
:param kwargs: Additional keyword arguments to be passed to the logging function
:return: None
"""
logging(server_ip, f'error: {message}: {ex}', **kwargs)
log_error(ex, server_ip, message, kwargs.get('keep_history', False), kwargs.get('service'))
raise Exception(f'{message}: {ex}')
@ -293,24 +339,35 @@ def is_user_has_access_to_group(user_id: int, group_id: int) -> None:
def handle_json_exceptions(ex: Exception, message: str, server_ip='Roxy-WI server') -> dict:
logging(server_ip, f'{message}: {ex}')
"""
Handle an exception and return a JSON error response.
Args:
ex: The exception that was raised
message: Additional information to include in the response
server_ip: The IP of the server where the error occurred
Returns:
A dictionary containing the error response
"""
log_error(ex, server_ip, message)
return ErrorResponse(error=f'{message}: {ex}').model_dump(mode='json')
def handler_exceptions_for_json_data(ex: Exception, main_ex_mes: str = '') -> tuple[dict, int]:
if isinstance(ex, KeyError):
return handle_json_exceptions(ex, 'Missing key in JSON data'), 500
elif isinstance(ex, ValueError):
return handle_json_exceptions(ex, 'Wrong type or missing value in JSON data'), 500
elif isinstance(ex, RoxywiResourceNotFound):
return handle_json_exceptions(ex, 'Resource not found'), 404
elif isinstance(ex, RoxywiGroupNotFound):
return handle_json_exceptions(ex, 'Group not found'), 404
elif isinstance(ex, RoxywiGroupMismatch):
return handle_json_exceptions(ex, 'Resource not found in group'), 404
elif isinstance(ex, RoxywiPermissionError):
return handle_json_exceptions(ex, 'You cannot edit this resource'), 403
elif isinstance(ex, RoxywiConflictError):
return handle_json_exceptions(ex, 'Conflict'), 429
else:
return handle_json_exceptions(ex, main_ex_mes), 500
"""
Handle an exception and return a JSON error response with an appropriate HTTP status code.
Args:
ex: The exception that was raised
main_ex_mes: Additional information to include in the response
Returns:
A tuple containing the error response and HTTP status code
"""
# If main_ex_mes is provided, use it as additional_info
additional_info = main_ex_mes if main_ex_mes else ""
# Use the centralized error handler
return handle_exception(ex, additional_info=additional_info)

View File

@ -0,0 +1,243 @@
"""
Error handling module for Roxy-WI.
This module provides a unified way to handle errors across the application.
It includes functions for handling exceptions, logging errors, and returning
appropriate HTTP responses.
"""
from typing import Any, Dict, Tuple
from flask import jsonify, request, render_template, g, redirect, url_for
from werkzeug.exceptions import HTTPException
from flask_pydantic.exceptions import ValidationError
from app.modules.roxywi.class_models import ErrorResponse
from app.modules.roxywi.exception import (
RoxywiResourceNotFound,
RoxywiGroupMismatch,
RoxywiGroupNotFound,
RoxywiPermissionError,
RoxywiConflictError,
RoxywiValidationError,
RoxywiCheckLimits
)
import app.modules.roxywi.common as roxywi_common
from app.modules.roxywi import logger
from app.middleware import get_user_params
# Map exception types to HTTP status codes
ERROR_CODE_MAPPING = {
RoxywiResourceNotFound: 404,
RoxywiGroupNotFound: 404,
RoxywiGroupMismatch: 404,
RoxywiPermissionError: 403,
RoxywiConflictError: 409,
RoxywiValidationError: 400,
RoxywiCheckLimits: 402,
KeyError: 400,
ValueError: 400,
Exception: 500
}
# Map exception types to error messages
ERROR_MESSAGE_MAPPING = {
RoxywiResourceNotFound: "Resource not found",
RoxywiGroupNotFound: "Group not found",
RoxywiGroupMismatch: "Resource not found in group",
RoxywiPermissionError: "You do not have permission to access this resource",
RoxywiConflictError: "Conflict with existing resource",
RoxywiValidationError: "Validation error",
RoxywiCheckLimits: "You have reached your plan limits",
KeyError: "Missing required field",
ValueError: "Invalid value provided"
}
def log_error(exception: Exception, server_ip: str = "Roxy-WI server",
additional_info: str = "", keep_history: bool = False,
service: str = None) -> None:
"""
Log an error with detailed information.
Args:
exception: The exception that was raised
server_ip: The IP of the server where the error occurred
additional_info: Additional information to include in the log
keep_history: Whether to keep the error in the action history
service: The service where the error occurred
"""
error_message = str(exception)
if additional_info:
error_message = f"{additional_info}: {error_message}"
# Log the error with structured context
logger.exception(
error_message,
exc=exception,
server_ip=server_ip,
service=service
)
# Keep history if requested
if keep_history and service:
try:
# Get user information if available
user_id = None
user_ip = request.remote_addr if request else 'unknown'
if hasattr(g, 'user'):
user_id = getattr(g.user, 'user_id', None)
if user_id:
roxywi_common.keep_action_history(service, error_message, server_ip, user_id, user_ip)
except Exception as e:
logger.error(f"Failed to keep error history: {e}", server_ip=server_ip)
def handle_exception(exception: Exception, server_ip: str = "Roxy-WI server",
additional_info: str = "", keep_history: bool = False,
service: str = None) -> Tuple[Dict[str, Any], int]:
"""
Handle an exception and return an appropriate HTTP response.
Args:
exception: The exception that was raised
server_ip: The IP of the server where the error occurred
additional_info: Additional information to include in the response
keep_history: Whether to keep the error in the action history
service: The service where the error occurred
Returns:
A tuple containing the error response and HTTP status code
"""
# Log the error
log_error(exception, server_ip, additional_info, keep_history, service)
# Determine the exception type and get the appropriate status code and message
for exception_type, status_code in ERROR_CODE_MAPPING.items():
if isinstance(exception, exception_type):
message = ERROR_MESSAGE_MAPPING.get(exception_type, str(exception))
if additional_info:
message = f"{additional_info}: {message}"
# Create the error response
error_response = ErrorResponse(error=message).model_dump(mode='json')
return error_response, status_code
# If we get here, we don't have a specific handler for this exception type
error_response = ErrorResponse(error=str(exception)).model_dump(mode='json')
return error_response, 500
def register_error_handlers(app):
"""
Register error handlers for the Flask application.
Args:
app: The Flask application
"""
@app.errorhandler(Exception)
def handle_exception_error(e):
"""Handle all unhandled exceptions."""
if isinstance(e, HTTPException):
# Pass through HTTP exceptions
return e
# Log the error
log_error(e)
# Return a JSON response
error_response, status_code = handle_exception(e)
return jsonify(error_response), status_code
# Register handlers for specific HTTP errors
@app.errorhandler(ValidationError)
def handle_pydantic_validation_errors(e):
"""Handle validation errors from pydantic."""
errors = []
if e.body_params:
req_type = e.body_params
elif e.form_params:
req_type = e.form_params
elif e.path_params:
req_type = e.path_params
else:
req_type = e.query_params
for er in req_type:
if len(er["loc"]) > 0:
errors.append(f'{er["loc"][0]}: {er["msg"]}')
else:
errors.append(er["msg"])
return ErrorResponse(error=errors).model_dump(mode='json'), 400
@app.errorhandler(400)
def bad_request(e):
"""Handle 400 Bad Request errors."""
return jsonify(ErrorResponse(error="Bad request").model_dump(mode='json')), 400
@app.errorhandler(401)
def unauthorized(e):
"""Handle 401 Unauthorized errors."""
if 'api' in request.url:
return jsonify(ErrorResponse(error=str(e)).model_dump(mode='json')), 401
return redirect(url_for('login_page', next=request.full_path))
@app.errorhandler(403)
def forbidden(e):
"""Handle 403 Forbidden errors."""
if 'api' in request.url:
return jsonify(ErrorResponse(error=str(e)).model_dump(mode='json')), 403
# Get user parameters for rendering the template
get_user_params()
kwargs = {
'user_params': g.user_params,
'title': e,
'e': e
}
return render_template('error.html', **kwargs), 403
@app.errorhandler(404)
def not_found(e):
"""Handle 404 Not Found errors."""
if 'api' in request.url:
return jsonify(ErrorResponse(error=str(e)).model_dump(mode='json')), 404
# Get user parameters for rendering the template
get_user_params()
kwargs = {
'user_params': g.user_params,
'title': e,
'e': e
}
return render_template('error.html', **kwargs), 404
@app.errorhandler(405)
def method_not_allowed(e):
"""Handle 405 Method Not Allowed errors."""
return jsonify(ErrorResponse(error=str(e)).model_dump(mode='json')), 405
@app.errorhandler(415)
def unsupported_media_type(e):
"""Handle 415 Unsupported Media Type errors."""
return jsonify(ErrorResponse(error="Unsupported Media Type").model_dump(mode='json')), 415
@app.errorhandler(429)
def too_many_requests(e):
"""Handle 429 Too Many Requests errors."""
return jsonify(ErrorResponse(error="Too many requests").model_dump(mode='json')), 429
@app.errorhandler(500)
def internal_server_error(e):
"""Handle 500 Internal Server Error errors."""
if 'api' in request.url:
return jsonify(ErrorResponse(error=str(e)).model_dump(mode='json')), 500
# Get user parameters for rendering the template
get_user_params()
kwargs = {
'user_params': g.user_params,
'title': e,
'e': e
}
return render_template('error.html', **kwargs), 500

View File

@ -0,0 +1,201 @@
import logging
import json
import os
import sys
from typing import Any, Optional
from datetime import datetime
from flask import request, has_request_context
# Define log levels
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
ERROR = logging.ERROR
CRITICAL = logging.CRITICAL
# Global logger instance
_logger = None
class StructuredLogFormatter(logging.Formatter):
"""
Custom formatter for structured logging.
Outputs logs in JSON format for better parsing by log analysis tools.
"""
def format(self, record: logging.LogRecord) -> str:
"""
Format the log record as a JSON string.
Args:
record: The log record to format
Returns:
A JSON string representation of the log record
"""
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'message': record.getMessage(),
}
# Add exception info if available
if record.exc_info:
log_data['exception'] = {
'type': record.exc_info[0].__name__,
'message': str(record.exc_info[1]),
'traceback': self.formatException(record.exc_info)
}
# Add request context if available
if has_request_context():
log_data['request'] = {
'method': request.method,
'path': request.path,
'ip': request.remote_addr,
}
# Add extra fields from the record
for key, value in record.__dict__.items():
if key.startswith('_') and not key.startswith('__'):
clean_key = key[1:] # Remove the leading underscore
log_data[clean_key] = value
return json.dumps(log_data)
def setup_logger(
log_path: str = '/var/log/roxy-wi',
log_file: str = 'roxy-wi.log',
log_level: int = logging.INFO,
console_logging: bool = False
) -> logging.Logger:
"""
Set up the logger with the specified configuration.
Args:
log_path: The directory where log files will be stored
log_file: The name of the log file
log_level: The minimum log level to record
console_logging: Whether to also log to the console
Returns:
The configured logger instance
"""
global _logger
if _logger is not None:
return _logger
# Create logger
logger = logging.getLogger('roxy-wi')
logger.setLevel(log_level)
logger.propagate = False
# Create log directory if it doesn't exist
os.makedirs(log_path, exist_ok=True)
# Create file handler
file_handler = logging.FileHandler(os.path.join(log_path, log_file))
file_handler.setLevel(log_level)
file_handler.setFormatter(StructuredLogFormatter())
logger.addHandler(file_handler)
# Add console handler if requested
if console_logging:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(StructuredLogFormatter())
logger.addHandler(console_handler)
_logger = logger
return logger
def get_logger() -> logging.Logger:
"""
Get the configured logger instance.
If the logger hasn't been set up yet, set it up with default configuration.
Returns:
The configured logger instance
"""
global _logger
if _logger is None:
_logger = setup_logger()
return _logger
def log(
level: int,
message: str,
server_ip: Optional[str] = None,
user_id: Optional[int] = None,
service: Optional[str] = None,
exception: Optional[Exception] = None,
**kwargs: Any
) -> None:
"""
Log a message with the specified level and additional context.
Args:
level: The log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
message: The log message
server_ip: The IP of the server related to the log message
user_id: The ID of the user related to the log message
service: The service related to the log message
exception: An exception to include in the log
**kwargs: Additional fields to include in the log
"""
logger = get_logger()
# Add extra fields with underscore prefix to avoid conflicts
extra = {f'_{k}': v for k, v in kwargs.items()}
if server_ip:
extra['_server_ip'] = server_ip
if user_id:
extra['_user_id'] = user_id
if service:
extra['_service'] = service
# Create a LogRecord with the extra fields
if exception:
logger.log(level, message, extra=extra, exc_info=exception)
else:
logger.log(level, message, extra=extra)
def debug(message: str, **kwargs: Any) -> None:
"""Log a DEBUG level message."""
log(DEBUG, message, **kwargs)
def info(message: str, **kwargs: Any) -> None:
"""Log an INFO level message."""
log(INFO, message, **kwargs)
def warning(message: str, **kwargs: Any) -> None:
"""Log a WARNING level message."""
log(WARNING, message, **kwargs)
def error(message: str, **kwargs: Any) -> None:
"""Log an ERROR level message."""
log(ERROR, message, **kwargs)
def critical(message: str, **kwargs: Any) -> None:
"""Log a CRITICAL level message."""
log(CRITICAL, message, **kwargs)
def exception(message: str, exc: Optional[Exception] = None, **kwargs: Any) -> None:
"""
Log an exception with ERROR level.
If exc is not provided, the current exception from sys.exc_info() will be used.
"""
log(ERROR, message, exception=exc, **kwargs)

View File

@ -163,7 +163,7 @@ def check_cluster_status(service: str, cluster_id: int):
return roxywi_common.handler_exceptions_for_json_data(e, 'Cannot get slaves')
status = 'ok'
statuses = []
cmd = f'systemctl is-active keepalived.service'
cmd = 'systemctl is-active keepalived.service'
for slave in slaves:
output = server_mod.ssh_command(slave[2], cmd)
statuses.append(output.replace('\n', '').replace('\r', ''))

View File

@ -1,9 +1,8 @@
import os
from typing import Union, Literal
from flask import render_template, request, g, abort, jsonify, redirect, url_for, send_from_directory
from flask import render_template, g, abort, jsonify, send_from_directory
from flask_jwt_extended import jwt_required
from flask_pydantic.exceptions import ValidationError
from flask_pydantic import validate
from pydantic import IPvAnyAddress
@ -39,77 +38,6 @@ def custom_unauthorized_response(_err):
return jsonify(error="Authorize first"), 401
@app.errorhandler(ValidationError)
def handle_pydantic_validation_errors1(e):
errors = []
if e.body_params:
req_type = e.body_params
elif e.form_params:
req_type = e.form_params
elif e.path_params:
req_type = e.path_params
else:
req_type = e.query_params
for er in req_type:
if len(er["loc"]) > 0:
errors.append(f'{er["loc"][0]}: {er["msg"]}')
else:
errors.append(er["msg"])
return ErrorResponse(error=errors).model_dump(mode='json'), 400
@app.errorhandler(401)
def no_auth(e):
if 'api' in request.url:
return jsonify({'error': str(e)}), 401
return redirect(url_for('login_page', next=request.full_path))
@app.errorhandler(403)
@get_user_params()
def page_is_forbidden(e):
if 'api' in request.url:
return jsonify({'error': str(e)}), 403
kwargs = {
'user_params': g.user_params,
'title': e,
'e': e
}
return render_template('error.html', **kwargs), 403
@app.errorhandler(404)
@get_user_params()
def page_not_found(e):
if 'api' in request.url:
return jsonify({'error': str(e)}), 404
get_user_params()
kwargs = {
'user_params': g.user_params,
'title': e,
'e': e
}
return render_template('error.html', **kwargs), 404
@app.errorhandler(405)
def method_not_allowed(e):
return jsonify({'error': str(e)}), 405
@app.errorhandler(500)
def internal_error(e):
if 'api' in request.url:
return jsonify({'error': str(e)}), 500
get_user_params()
kwargs = {
'user_params': g.user_params,
'title': e,
'e': e
}
return render_template('error.html', **kwargs), 500
@app.route('/favicon.ico')
def favicon():
return send_from_directory(os.path.join(app.root_path, 'static'),

View File

@ -11,7 +11,6 @@ import app.modules.roxywi.group as group_mod
import app.modules.roxywi.common as roxywi_common
import app.modules.server.server as server_mod
from app.middleware import get_user_params, page_for_admin, check_group
from app.modules.roxywi.exception import RoxywiResourceNotFound
from app.modules.roxywi.class_models import BaseResponse, IdResponse, IdDataResponse, ServerRequest, GroupQuery, GroupRequest
from app.modules.common.common_classes import SupportClass

View File

@ -284,7 +284,7 @@ class ServiceActionView(MethodView):
service_action.common_action(server.ip, action, service)
return BaseResponse().model_dump(mode='json')
except Exception as e:
return roxywi_common.handler_exceptions_for_json_data(e, f'Cannot do {action}')
return ErrorResponse(error=f'Cannot do {action}: {e}').model_dump(mode='json')
class ServiceBackendView(MethodView):