haproxy-wi/app/modules/roxywi/common.py

374 lines
10 KiB
Python

import os
import glob
from typing import Any, Union
from flask import request, g
from flask_jwt_extended import get_jwt
from flask_jwt_extended import verify_jwt_in_request
import app.modules.db.udp as udp_sql
import app.modules.db.ha_cluster as ha_sql
import app.modules.db.roxy as roxy_sql
import app.modules.db.user as user_sql
import app.modules.db.group as group_sql
import app.modules.db.server as server_sql
import app.modules.db.history as history_sql
import app.modules.roxy_wi_tools as roxy_wi_tools
from app.modules.roxywi.class_models import ErrorResponse
from app.modules.roxywi.exception import RoxywiGroupMismatch
from app.modules.roxywi.error_handler import handle_exception, log_error
from app.modules.roxywi import logger
get_config_var = roxy_wi_tools.GetConfigVar()
def get_jwt_token_claims() -> dict:
verify_jwt_in_request()
claims = get_jwt()
claim = {'user_id': claims['user_id'], 'group': claims['group']}
return claim
def get_user_group(**kwargs) -> int:
try:
claims = get_jwt_token_claims()
user_group_id = claims['group']
group = group_sql.get_group(user_group_id)
if group.group_id == int(user_group_id):
if kwargs.get('id'):
user_group = group.group_id
else:
user_group = group.name
else:
user_group = ''
except Exception as e:
raise Exception(f'error: {e}')
return user_group
def check_user_group_for_flask():
claims = get_jwt_token_claims()
user_id = claims['user_id']
group_id = claims['group']
if user_sql.check_user_group(user_id, group_id):
return True
else:
logging('Roxy-WI server', 'warning: has tried to actions in not his group')
return False
def check_user_group_for_socket(user_id: int, group_id: int) -> bool:
if user_sql.check_user_group(user_id, group_id):
return True
else:
logging('Roxy-WI server', 'warning: has tried to actions in not his group')
return False
def check_is_server_in_group(server_ip: str) -> bool:
group_id = get_user_group(id=1)
server = server_sql.get_server_by_ip(server_ip)
if (server.ip == server_ip and int(server.group_id) == int(group_id)) or group_id == 1:
return True
else:
logging('Roxy-WI server', 'warning: has tried to actions in not his group server')
return False
def get_files(folder, file_format, server_ip=None) -> list:
if file_format == 'log':
file = []
else:
file = set()
return_files = set()
i = 0
for files in sorted(glob.glob(os.path.join(folder, f'*.{file_format}*'))):
if file_format == 'log':
try:
file += [(i, files.split('/')[4])]
except Exception as e:
print(e)
else:
file.add(files.split('/')[-1])
i += 1
files = file
if file_format == 'cfg' or file_format == 'conf':
for file in files:
ip = file.split("-")
if server_ip == ip[0]:
return_files.add(file)
return sorted(return_files, reverse=True)
else:
return file
def logging(server_ip: Union[str, int], action: str, **kwargs) -> None:
"""
Log an action with detailed information.
Args:
server_ip: The IP of the server where the action occurred
action: The action that was performed
**kwargs: Additional arguments, including:
keep_history: Whether to keep the action in the history
service: The service where the action occurred
"""
try:
# JWT validation and extracting user's information
claims = get_jwt_token_claims()
user_id = claims['user_id']
user = user_sql.get_user_id(user_id=user_id)
user_group = get_user_group()
ip = request.remote_addr
# Determine log level and clean up action string
if 'error' in action:
log_level = logger.ERROR
action = action.replace('error: : ', '')
action = action.replace('error: ', '')
elif 'warning' in action:
log_level = logger.WARNING
action = action.replace('warning: ', '')
else:
log_level = logger.INFO
# Log the message with structured context
logger.log(
log_level,
action,
server_ip=server_ip,
user_id=user.user_id,
username=user.username,
user_group=user_group,
client_ip=ip,
service=kwargs.get('service')
)
# Keep action history if requested
if kwargs.get('keep_history'):
try:
keep_action_history(kwargs.get('service'), action, server_ip, user.user_id, ip)
except Exception as e:
logger.error(f'Cannot save history: {e}', server_ip=server_ip)
except Exception as e:
# Fallback logging if we can't get user information
logger.error(f'Error in logging function: {e}', server_ip=server_ip)
def keep_action_history(service: str, action: str, server_ip: str, user_id: int, user_ip: str):
"""
Keep a history of actions in the database.
Args:
service: The service where the action occurred
action: The action that was performed
server_ip: The IP of the server where the action occurred
user_id: The ID of the user who performed the action
user_ip: The IP of the user who performed the action
"""
if user_ip == '':
user_ip = 'localhost'
if service == 'HA cluster':
try:
server_id = server_ip
hostname = ha_sql.select_cluster_name(int(server_id))
except Exception as e:
logger.error(
f'Cannot get info about cluster {server_ip} for history',
server_ip='Roxy-WI server',
exception=e
)
return
elif service == 'UDP listener':
try:
server_id = int(server_ip)
listener = udp_sql.get_listener(server_id)
hostname = listener.name
except Exception as e:
logger.error(
f'Cannot get info about Listener {server_ip} for history',
server_ip='Roxy-WI server',
exception=e
)
return
else:
try:
server = server_sql.get_server_by_ip(server_ip)
server_id = server.server_id
hostname = server.hostname
except Exception as e:
logger.error(
f'Cannot get info about {server_ip} for history',
server_ip='Roxy-WI server',
exception=e
)
return
try:
history_sql.insert_action_history(service, action, server_id, user_id, user_ip, server_ip, hostname)
except Exception as e:
logger.error(
'Cannot save a history',
server_ip='Roxy-WI server',
exception=e,
service=service,
action=action
)
def get_dick_permit(**kwargs):
group_id = get_user_group(id=1)
if check_user_group_for_flask():
try:
servers = server_sql.get_dick_permit(group_id, **kwargs)
except Exception as e:
raise Exception(e)
else:
return servers
else:
logging('Roxy-WI server', 'warning: has tried to actions in not his group')
return []
def get_users_params(**kwargs):
user_data = get_jwt_token_claims()
try:
user_id = user_data['user_id']
user = user_sql.get_user_id(user_id)
except Exception:
raise Exception('error: Cannot get user id')
if int(user_data['group']) != int(user.group_id):
raise Exception('error: Wrong active group')
try:
role = user_sql.get_role_id(user_id, user.group_id)
except Exception as e:
raise Exception(f'error: Cannot get user role {e}')
try:
user_services = user_sql.select_user_services(user_id)
except Exception as e:
raise Exception(f'error: Cannot get user services {e}')
if kwargs.get('virt') and kwargs.get('service') == 'haproxy':
servers = get_dick_permit(virt=1, haproxy=1)
elif kwargs.get('virt'):
servers = get_dick_permit(virt=1)
elif kwargs.get('disable'):
servers = get_dick_permit(disable=0)
elif kwargs.get('service'):
servers = get_dick_permit(service=kwargs.get('service'))
else:
servers = get_dick_permit()
user_lang = get_user_lang_for_flask()
user_params = {
'user': user.username,
'role': role,
'servers': servers,
'user_services': user_services,
'lang': user_lang,
'user_id': user_id,
'group_id': user.group_id
}
return user_params
def get_user_lang_for_flask() -> str:
try:
user_lang = request.cookies.get('lang')
except Exception:
return 'en'
if user_lang is None:
user_lang = 'en'
return user_lang
def return_user_status() -> dict:
user_subscription = {}
user_subscription.setdefault('user_status', roxy_sql.get_user().Status)
user_subscription.setdefault('user_plan', roxy_sql.get_user().Plan)
return user_subscription
def return_unsubscribed_user_status() -> dict:
user_subscription = {'user_status': 0, 'user_plan': 0}
return user_subscription
def return_user_subscription():
try:
user_subscription = return_user_status()
except Exception as e:
user_subscription = return_unsubscribed_user_status()
logging('Roxy-WI server', f'Cannot get a user plan: {e}')
return user_subscription
def handle_exceptions(ex: Exception, server_ip: str, message: str, **kwargs: Any) -> None:
"""
:param server_ip:
:param ex: The exception that was caught
:param message: The error message to be logged and raised
:param kwargs: Additional keyword arguments to be passed to the logging function
:return: None
"""
log_error(ex, server_ip, message, kwargs.get('keep_history', False), kwargs.get('service'))
raise Exception(f'{message}: {ex}')
def is_user_has_access_to_its_group(user_id: int) -> None:
if not user_sql.check_user_group(user_id, g.user_params['group_id']) and g.user_params['role'] != 1:
raise RoxywiGroupMismatch
def is_user_has_access_to_group(user_id: int, group_id: int) -> None:
if not user_sql.check_user_group(user_id, group_id) and g.user_params['role'] != 1:
raise RoxywiGroupMismatch
def handle_json_exceptions(ex: Exception, message: str, server_ip='Roxy-WI server') -> dict:
"""
Handle an exception and return a JSON error response.
Args:
ex: The exception that was raised
message: Additional information to include in the response
server_ip: The IP of the server where the error occurred
Returns:
A dictionary containing the error response
"""
log_error(ex, server_ip, message)
return ErrorResponse(error=f'{message}: {ex}').model_dump(mode='json')
def handler_exceptions_for_json_data(ex: Exception, main_ex_mes: str = '') -> tuple[dict, int]:
"""
Handle an exception and return a JSON error response with an appropriate HTTP status code.
Args:
ex: The exception that was raised
main_ex_mes: Additional information to include in the response
Returns:
A tuple containing the error response and HTTP status code
"""
# If main_ex_mes is provided, use it as additional_info
additional_info = main_ex_mes if main_ex_mes else ""
# Use the centralized error handler
return handle_exception(ex, additional_info=additional_info)