From ac5a6093a5275fd8b90efa3534e23970b696fe25 Mon Sep 17 00:00:00 2001 From: Aidaho Date: Tue, 19 Nov 2024 15:34:14 +0300 Subject: [PATCH] v8.1.2: Refactor IP validation and type annotations in routes Replaced common.is_ip_or_dns() with pydantic type validation in various route functions to ensure input consistency and clarity. Updated several methods to use Union[IPvAnyAddress, DomainName] for typing and added necessary imports. Also refined templates and helper functions for better readability. --- app/modules/server/server.py | 3 +- app/modules/service/common.py | 2 -- app/routes/add/routes.py | 32 +++++++++-------- app/routes/install/routes.py | 28 +++++++-------- app/routes/main/routes.py | 14 ++++---- app/routes/server/routes.py | 37 ++++++++++---------- app/templates/ajax/show_server_services.html | 36 +++++++++---------- 7 files changed, 75 insertions(+), 77 deletions(-) diff --git a/app/modules/server/server.py b/app/modules/server/server.py index 1c9cc05c..bf46c9af 100644 --- a/app/modules/server/server.py +++ b/app/modules/server/server.py @@ -108,7 +108,6 @@ def get_remote_files(server_ip: str, config_dir: str, file_format: str): def get_system_info(server_ip: str) -> None: - server_ip = common.is_ip_or_dns(server_ip) if server_ip == '': raise Exception('IP cannot be empty') @@ -482,7 +481,7 @@ def server_is_up(server_ip: str) -> str: def show_server_services(server_id: int) -> str: - server = server_sql.select_servers(id=server_id) + server = server_sql.get_server(server_id) lang = roxywi_common.get_user_lang_for_flask() return render_template('ajax/show_server_services.html', server=server, lang=lang) diff --git a/app/modules/service/common.py b/app/modules/service/common.py index 410cd43a..039b7d1f 100644 --- a/app/modules/service/common.py +++ b/app/modules/service/common.py @@ -9,7 +9,6 @@ import app.modules.db.sql as sql import app.modules.db.user as user_sql import app.modules.db.server as server_sql import app.modules.db.service as service_sql -import app.modules.common.common as common import app.modules.server.server as server_mod import app.modules.roxywi.common as roxywi_common import app.modules.config.section as section_mod @@ -75,7 +74,6 @@ def is_not_allowed_to_restart(server_id: int, service: str, action: str) -> int: def get_exp_version(server_ip: str, service_name: str) -> str: - server_ip = common.is_ip_or_dns(server_ip) if service_name == 'haproxy': command = "/opt/prometheus/exporters/haproxy_exporter --version 2>&1 |head -1|awk '{print $3}'" elif service_name == 'nginx': diff --git a/app/routes/add/routes.py b/app/routes/add/routes.py index d432a632..4b0a5b88 100644 --- a/app/routes/add/routes.py +++ b/app/routes/add/routes.py @@ -1,8 +1,10 @@ import os +from typing import Union from flask import render_template, request, jsonify, redirect, url_for, g from flask_jwt_extended import jwt_required, get_jwt from flask_pydantic import validate +from pydantic import IPvAnyAddress from app.modules.roxywi.class_models import SSLCertUploadRequest, DataStrResponse, SavedServerRequest, BaseResponse from app.routes.add import bp @@ -16,6 +18,7 @@ import app.modules.roxywi.common as roxywi_common import app.modules.roxy_wi_tools as roxy_wi_tools from app.views.service.haproxy_section_views import (GlobalSectionView, DefaultsSectionView, ListenSectionView, UserListSectionView, PeersSectionView) +from app.modules.roxywi.class_models import DomainName get_config = roxy_wi_tools.GetConfigVar() @@ -123,12 +126,12 @@ def save_bwlist(): @bp.route('/haproxy/bwlist/delete////') -def delete_bwlist(server_ip, color, name, group): - server_ip = common.is_ip_or_dns(server_ip) +@validate() +def delete_bwlist(server_ip: Union[IPvAnyAddress, DomainName], color, name, group): color = common.checkAjaxInput(color) list_name = common.checkAjaxInput(name) - return add_mod.delete_bwlist(list_name, color, group, server_ip) + return add_mod.delete_bwlist(list_name, color, group, str(server_ip)) @bp.route('/haproxy/bwlist///') @@ -225,23 +228,22 @@ def delete_saved_server(server_id): @bp.route('/certs/') @bp.route('/certs/') -def get_certs(server_ip): - if isinstance(server_ip, str): - server_ip = common.is_ip_or_dns(server_ip) - elif isinstance(server_ip, int): +@validate() +def get_certs(server_ip: Union[IPvAnyAddress, DomainName, int]): + if isinstance(server_ip, int): server = server_sql.get_server(server_ip) server_ip = server.ip - return add_mod.get_ssl_certs(server_ip) + return add_mod.get_ssl_certs(str(server_ip)) @bp.route('/cert//', methods=['DELETE', 'GET']) -def get_cert(server_ip, cert_id): - server_ip = common.is_ip_or_dns(server_ip) +@validate() +def get_cert(server_ip: Union[IPvAnyAddress, DomainName], cert_id): cert_id = common.checkAjaxInput(cert_id) if request.method == 'DELETE': - return add_mod.del_ssl_cert(server_ip, cert_id) + return add_mod.del_ssl_cert(str(server_ip), cert_id) elif request.method == 'GET': - return add_mod.get_ssl_cert(server_ip, cert_id) + return add_mod.get_ssl_cert(str(server_ip), cert_id) @bp.post('/cert/add') @@ -255,10 +257,10 @@ def upload_cert(body: SSLCertUploadRequest): @bp.route('/cert/get/raw//') -def get_cert_raw(server_ip, cert_id): - server_ip = common.is_ip_or_dns(server_ip) +@validate() +def get_cert_raw(server_ip: Union[IPvAnyAddress, DomainName], cert_id): cert_id = common.checkAjaxInput(cert_id) - return add_mod.get_ssl_raw_cert(server_ip, cert_id) + return add_mod.get_ssl_raw_cert(str(server_ip), cert_id) @bp.route('/map', methods=['POST', 'PUT', 'DELETE', 'GET']) diff --git a/app/routes/install/routes.py b/app/routes/install/routes.py index d42c84fa..9e7d54ea 100644 --- a/app/routes/install/routes.py +++ b/app/routes/install/routes.py @@ -1,5 +1,9 @@ +from typing import Union, Literal + from flask import render_template, request, g, jsonify from flask_jwt_extended import jwt_required +from flask_pydantic import validate +from pydantic import IPvAnyAddress from app.routes.install import bp from app.middleware import get_user_params @@ -12,6 +16,7 @@ import app.modules.service.common as service_common import app.modules.service.installation as service_mod import app.modules.service.exporter_installation as exp_installation from app.views.install.views import InstallView +from app.modules.roxywi.class_models import DomainName bp.add_url_rule( @@ -62,17 +67,16 @@ def install_exporter(exporter): @bp.route('/exporter//version/') -def get_exporter_version(exporter, server_ip): - server_ip = common.is_ip_or_dns(server_ip) - return service_common.get_exp_version(server_ip, exporter) +@validate() +def get_exporter_version(exporter: str, server_ip: Union[IPvAnyAddress, DomainName]): + return service_common.get_exp_version(str(server_ip), exporter) @bp.post('/waf//') -def install_waf(service, server_ip): - server_ip = common.is_ip_or_dns(server_ip) - +@validate() +def install_waf(service: str, server_ip: Union[IPvAnyAddress, DomainName]): try: - inv, server_ips = service_mod.generate_waf_inv(server_ip, service) + inv, server_ips = service_mod.generate_waf_inv(str(server_ip), service) except Exception as e: return jsonify({'status': 'failed', 'error': f'Cannot create inventory: {e}'}) try: @@ -113,12 +117,8 @@ def install_geoip(): @bp.route('/geoip//') -def check_geoip(service, server_ip): - server_ip = common.is_ip_or_dns(server_ip) - - if service not in ('haproxy', 'nginx'): - return 'error: Wrong service' - +@validate() +def check_geoip(service: Literal['haproxy', 'nginx'], server_ip: Union[IPvAnyAddress, DomainName]): service_dir = common.return_nice_path(sql.get_setting(f'{service}_dir')) cmd = f"ls {service_dir}geoip/" - return server_mod.ssh_command(server_ip, cmd) + return server_mod.ssh_command(str(server_ip), cmd) diff --git a/app/routes/main/routes.py b/app/routes/main/routes.py index 797dd663..40287546 100644 --- a/app/routes/main/routes.py +++ b/app/routes/main/routes.py @@ -1,9 +1,11 @@ import os +from typing import Union, Literal from flask import render_template, request, g, abort, jsonify, redirect, url_for, send_from_directory from flask_jwt_extended import jwt_required from flask_pydantic.exceptions import ValidationError from flask_pydantic import validate +from pydantic import IPvAnyAddress from app import app, cache, jwt from app.routes.main import bp @@ -19,7 +21,7 @@ import app.modules.roxywi.nettools as nettools_mod import app.modules.roxywi.common as roxywi_common import app.modules.service.common as service_common import app.modules.service.haproxy as service_haproxy -from app.modules.roxywi.class_models import ErrorResponse, NettoolsRequest +from app.modules.roxywi.class_models import ErrorResponse, NettoolsRequest, DomainName @app.template_filter('strftime') @@ -133,9 +135,8 @@ def stats(service, serv): @jwt_required() @check_services @get_user_params() -def show_stats(service, server_ip): - server_ip = common.is_ip_or_dns(server_ip) - +@validate() +def show_stats(service: Literal['haproxy', 'apache', 'nginx'], server_ip: Union[IPvAnyAddress, DomainName]): if service in ('nginx', 'apache'): try: return service_common.get_stat_page(server_ip, service, g.user_params['group_id']) @@ -197,9 +198,10 @@ def nettools_check(check, body: NettoolsRequest): @bp.route('/history//') @jwt_required() @get_user_params() -def service_history(service, server_ip): +@validate() +def service_history(service: str, server_ip: Union[IPvAnyAddress, DomainName]): history = '' - server_ip = common.checkAjaxInput(server_ip) + server_ip = str(server_ip) if service in ('haproxy', 'nginx', 'keepalived', 'apache', 'cluster', 'udp'): service_desc = service_sql.select_service(service) diff --git a/app/routes/server/routes.py b/app/routes/server/routes.py index 6080e486..2a2ea19f 100644 --- a/app/routes/server/routes.py +++ b/app/routes/server/routes.py @@ -1,7 +1,10 @@ import json +from typing import Union from flask import render_template, request, g, jsonify from flask_jwt_extended import jwt_required +from flask_pydantic import validate +from pydantic import IPvAnyAddress from app.routes.server import bp import app.modules.db.cred as cred_sql @@ -16,6 +19,7 @@ from app.middleware import get_user_params from app.views.server.views import ServerView, ServerGroupView, ServerGroupsView, ServerIPView from app.views.server.cred_views import CredView, CredsView from app.views.server.backup_vews import BackupView, S3BackupView, GitBackupView +from app.modules.roxywi.class_models import DomainName def register_api(view, endpoint, url, pk='listener_id', pk_type='int'): @@ -47,12 +51,12 @@ def before_request(): @bp.route('/check/ssh/') -def check_ssh(server_ip): +@validate() +def check_ssh(server_ip: Union[IPvAnyAddress, DomainName]): roxywi_auth.page_for_admin(level=2) - server_ip = common.is_ip_or_dns(server_ip) try: - return server_mod.ssh_command(server_ip, "ls -1t") + return server_mod.ssh_command(str(server_ip), "ls -1t") except Exception as e: return str(e) @@ -82,12 +86,12 @@ def check_server(server_id): @bp.route('/show/if/') -def show_if(server_ip): +@validate() +def show_if(server_ip: Union[IPvAnyAddress, DomainName]): roxywi_auth.page_for_admin(level=2) - server_ip = common.is_ip_or_dns(server_ip) command = "sudo ip link|grep 'UP' |grep -v 'lo'| awk '{print $2}' |awk -F':' '{print $1}'" - return server_mod.ssh_command(server_ip, command) + return server_mod.ssh_command(str(server_ip), command) @bp.app_template_filter('string_to_dict') @@ -97,17 +101,15 @@ def string_to_dict(dict_string) -> dict: @bp.route('/system_info/get//') -def get_system_info(server_ip, server_id): - server_ip = common.is_ip_or_dns(server_ip) - - return server_mod.show_system_info(server_ip, server_id) +@validate() +def get_system_info(server_ip: Union[IPvAnyAddress, DomainName], server_id: int): + return server_mod.show_system_info(str(server_ip), server_id) @bp.route('/system_info/update//') -def update_system_info(server_ip, server_id): - server_ip = common.is_ip_or_dns(server_ip) - - return server_mod.update_system_info(server_ip, server_id) +@validate() +def update_system_info(server_ip: Union[IPvAnyAddress, DomainName], server_id): + return server_mod.update_system_info(str(server_ip), server_id) @bp.route('/services/', methods=['GET', 'POST']) @@ -124,12 +126,11 @@ def show_server_services(server_id): @bp.route('/firewall/') -def show_firewall(server_ip): +@validate() +def show_firewall(server_ip: Union[IPvAnyAddress, DomainName]): roxywi_auth.page_for_admin(level=2) - server_ip = common.is_ip_or_dns(server_ip) - - return server_mod.show_firewalld_rules(server_ip) + return server_mod.show_firewalld_rules(str(server_ip)) @bp.route('/backup', methods=['GET']) diff --git a/app/templates/ajax/show_server_services.html b/app/templates/ajax/show_server_services.html index 94b80767..43a8a29d 100644 --- a/app/templates/ajax/show_server_services.html +++ b/app/templates/ajax/show_server_services.html @@ -2,62 +2,58 @@ - {% for s in server %} - {% if s.15|int() == 0 %} - + {% if server.haproxy|int() == 0 %} + {% endif %} - {% if s.14|int() == 0 %} - + {% if server.nginx|int() == 0 %} + {% endif %} - {% if s.24|int() == 0 %} - + {% if server.apache|int() == 0 %} + {% endif %} - {% if s.13|int() == 0 %} - + {% if server.keepalived|int() == 0 %} + {% endif %} - {% endfor %}
{{lang.words.all|title()}} {{lang.words.services}}
HAProxy +
NGINX +
Apache +
Keepalived +
- {% for s in server %} - {% if s.15|int() == 1 %} - + {% if server.haproxy|int() == 1 %} + {% endif %} - {% if s.14|int() == 1 %} - + {% if server.nginx|int() == 1 %} + {% endif %} - {% if s.24|int() == 1 %} - + {% if server.apache|int() == 1 %} + {% endif %} - {% if s.13|int() == 1 %} - + {% if server.keepalived|int() == 1 %} + {% endif %} - {% endfor %}
{{lang.words.actives|title()}} {{lang.words.services}}
HAProxy -
NGINX -
Apache -
Keepalived -