v8.1.2: Refactor IP validation and type annotations in routes

Replaced common.is_ip_or_dns() with pydantic type validation in various route functions to ensure input consistency and clarity. Updated several methods to use Union[IPvAnyAddress, DomainName] for typing and added necessary imports. Also refined templates and helper functions for better readability.
pull/403/head
Aidaho 2024-11-30 10:21:03 +03:00
parent ac5a6093a5
commit ddd9a1b92d
7 changed files with 89 additions and 106 deletions

View File

@ -1,6 +1,7 @@
import os
from typing import Union
from peewee import IntegrityError
from flask import render_template, make_response
import app.modules.db.sql as sql
@ -8,18 +9,21 @@ import app.modules.db.user as user_sql
import app.modules.db.service as service_sql
import app.modules.roxywi.common as roxywi_common
import app.modules.tools.alerting as alerting
from app.modules.db.common import not_unique_error
def create_user(new_user: str, email: str, password: str, role: int, enabled: int, group: int) -> Union[int, tuple]:
try:
user_id = user_sql.add_user(new_user, email, password, role, enabled, group)
roxywi_common.logging(f'a new user {new_user}', 'has been created', roxywi=1, login=1)
except IntegrityError as e:
not_unique_error(e)
except Exception as e:
return roxywi_common.handler_exceptions_for_json_data(e, 'Cannot create a new user')
raise e
try:
user_sql.update_user_role(user_id, group, role)
except Exception as e:
return roxywi_common.handler_exceptions_for_json_data(e, 'Cannot update user role')
raise e
try:
if password == 'aduser':
password = 'your domain password'
@ -126,7 +130,6 @@ def get_ldap_email(username) -> str:
ldap_class_search = sql.get_setting('ldap_class_search')
ldap_user_attribute = sql.get_setting('ldap_user_attribute')
ldap_type = sql.get_setting('ldap_type')
ldap_proto = 'ldap' if ldap_type == "0" else 'ldaps'
try:
@ -137,8 +140,7 @@ def get_ldap_email(username) -> str:
try:
ldap_bind.protocol_version = ldap.VERSION3
ldap_bind.set_option(ldap.OPT_REFERRALS, 0)
_ = ldap_bind.simple_bind_s(user, password)
ldap_bind.simple_bind_s(user, password)
criteria = f"(&(objectClass={ldap_class_search})({ldap_user_attribute}={username}))"
attributes = [ldap_search_field]

View File

@ -1,5 +1,9 @@
from typing import Union
from flask import render_template, request, g
from flask_jwt_extended import jwt_required
from flask_pydantic import validate
from pydantic import IPvAnyAddress
from app.routes.runtime import bp
from app.middleware import get_user_params
@ -7,6 +11,7 @@ import app.modules.common.common as common
import app.modules.config.runtime as runtime
import app.modules.db.server as server_sql
import app.modules.service.haproxy as service_haproxy
from app.modules.roxywi.class_models import DomainName, EscapedString
@bp.before_request
@ -24,37 +29,31 @@ def runtimeapi():
@bp.route('/backends/<int:server_ip>')
@bp.route('/backends/<server_ip>')
def show_backends(server_ip):
if isinstance(server_ip, str):
server_ip = common.is_ip_or_dns(server_ip)
elif isinstance(server_ip, int):
@validate()
def show_backends(server_ip: Union[IPvAnyAddress, DomainName, int]):
if isinstance(server_ip, int):
server = server_sql.get_server(server_ip)
server_ip = server.ip
try:
return runtime.show_backends(server_ip)
return runtime.show_backends(str(server_ip))
except Exception as e:
return f'{e}'
@bp.route('/backend/servers/<server_ip>/<backend>')
def show_backend_servers(server_ip, backend):
server_ip = common.is_ip_or_dns(server_ip)
backend = common.checkAjaxInput(backend)
@validate()
def show_backend_servers(server_ip: Union[IPvAnyAddress, DomainName], backend: EscapedString):
try:
return runtime.show_frontend_backend(server_ip, backend)
return runtime.show_frontend_backend(str(server_ip), backend)
except Exception as e:
return f'{e}'
@bp.route('/backend/server/<server_ip>/<backend>/<backend_server>')
def show_backend_server(server_ip, backend, backend_server):
server_ip = common.is_ip_or_dns(server_ip)
backend = common.checkAjaxInput(backend)
backend_server = common.checkAjaxInput(backend_server)
@validate()
def show_backend_server(server_ip: Union[IPvAnyAddress, DomainName], backend: EscapedString, backend_server: EscapedString):
try:
return runtime.show_server(server_ip, backend, backend_server)
return runtime.show_server(str(server_ip), backend, backend_server)
except Exception as e:
return f'{e}'
@ -83,22 +82,21 @@ def change_ip_port():
@bp.route('/maxconn/<server_ip>')
def maxconn_select(server_ip):
server_ip = common.is_ip_or_dns(server_ip)
@validate()
def maxconn_select(server_ip: Union[IPvAnyAddress, DomainName]):
try:
return runtime.get_backends_from_config(server_ip, backends='frontend')
return runtime.get_backends_from_config(str(server_ip), backends='frontend')
except Exception as e:
return f'{e}'
@bp.route('/maxconn/<type_maxconn>/<server_ip>', methods=['POST'])
def change_maxconn(type_maxconn, server_ip):
server_ip = common.is_ip_or_dns(server_ip)
@validate()
def change_maxconn(type_maxconn: str, server_ip: Union[IPvAnyAddress, DomainName]):
server_ip = str(server_ip)
maxconn = common.checkAjaxInput(request.form.get('maxconn'))
if type_maxconn == 'global':
try:
return runtime.change_maxconn_global(server_ip, maxconn)
except Exception as e:
@ -124,121 +122,105 @@ def change_maxconn(type_maxconn, server_ip):
@bp.route('/action/<server_ip>', methods=['POST'])
@get_user_params()
def action(server_ip):
server_ip = common.is_ip_or_dns(server_ip)
@validate()
def action(server_ip: Union[IPvAnyAddress, DomainName]):
enable = common.checkAjaxInput(request.form.get('servaction'))
backend = common.checkAjaxInput(request.form.get('servbackend'))
save = request.form.get('save')
try:
return service_haproxy.runtime_command(server_ip, enable, backend, save)
return service_haproxy.runtime_command(str(server_ip), enable, backend, save)
except Exception as e:
return f'{e}'
@bp.post('/stats/action/<server_ip>')
@get_user_params()
def stat_page_action(server_ip):
@validate()
def stat_page_action(server_ip: Union[IPvAnyAddress, DomainName]):
try:
return service_haproxy.stat_page_action(server_ip, g.user_params['group_id'])
return service_haproxy.stat_page_action(str(server_ip), g.user_params['group_id'])
except Exception as e:
return f'{e}'
@bp.route('/tables/<server_ip>')
def get_all_tables(server_ip):
server_ip = common.is_ip_or_dns(server_ip)
@validate()
def get_all_tables(server_ip: Union[IPvAnyAddress, DomainName]):
try:
return runtime.get_all_stick_table(server_ip)
return runtime.get_all_stick_table(str(server_ip))
except Exception as e:
return f'{e}'
@bp.route('/table/<server_ip>/<table>')
def get_table(server_ip, table):
server_ip = common.is_ip_or_dns(server_ip)
table = common.checkAjaxInput(table)
@validate()
def get_table(server_ip: Union[IPvAnyAddress, DomainName], table: EscapedString):
try:
return runtime.table_select(server_ip, table)
return runtime.table_select(str(server_ip), table)
except Exception as e:
return f'{e}'
@bp.route('/table/delete/<server_ip>/<table>/<ip_for_delete>')
def delete_ip(server_ip, table, ip_for_delete):
server_ip = common.is_ip_or_dns(server_ip)
table = common.checkAjaxInput(table)
ip_for_delete = common.is_ip_or_dns(ip_for_delete)
@validate()
def delete_ip(server_ip: Union[IPvAnyAddress, DomainName], table: EscapedString, ip_for_delete: Union[IPvAnyAddress, DomainName]):
try:
return runtime.delete_ip_from_stick_table(server_ip, ip_for_delete, table)
return runtime.delete_ip_from_stick_table(str(server_ip), str(ip_for_delete), table)
except Exception as e:
return f'{e}'
@bp.route('/table/clear/<server_ip>/<table>')
def clear_table(server_ip, table):
server_ip = common.is_ip_or_dns(server_ip)
table = common.checkAjaxInput(table)
@validate()
def clear_table(server_ip: Union[IPvAnyAddress, DomainName], table: EscapedString):
try:
return runtime.clear_stick_table(server_ip, table)
return runtime.clear_stick_table(str(server_ip), table)
except Exception as e:
return f'{e}'
@bp.route('/session/<server_ip>')
def select_sessions(server_ip):
server_ip = common.is_ip_or_dns(server_ip)
@validate()
def select_sessions(server_ip: Union[IPvAnyAddress, DomainName]):
try:
return runtime.select_session(server_ip)
return runtime.select_session(str(server_ip))
except Exception as e:
return f'{e}'
@bp.route('/session/<server_ip>/<sess_id>')
def show_sessions(server_ip, sess_id):
server_ip = common.is_ip_or_dns(server_ip)
sess_id = common.checkAjaxInput(sess_id)
@validate()
def show_sessions(server_ip: Union[IPvAnyAddress, DomainName], sess_id: EscapedString):
try:
return runtime.show_session(server_ip, sess_id)
return runtime.show_session(str(server_ip), sess_id)
except Exception as e:
return f'{e}'
@bp.route('/session/delete/<server_ip>/<sess_id>')
def delete_session(server_ip, sess_id):
server_ip = common.is_ip_or_dns(server_ip)
sess_id = common.checkAjaxInput(sess_id)
@validate()
def delete_session(server_ip: Union[IPvAnyAddress, DomainName], sess_id: EscapedString):
try:
return runtime.delete_session(server_ip, sess_id)
return runtime.delete_session(str(server_ip), sess_id)
except Exception as e:
return f'{e}'
@bp.route('/list/<server_ip>')
def get_lists(server_ip):
server_ip = common.is_ip_or_dns(server_ip)
@validate()
def get_lists(server_ip: Union[IPvAnyAddress, DomainName]):
try:
return runtime.list_of_lists(server_ip)
return runtime.list_of_lists(str(server_ip))
except Exception as e:
return f'{e}'
@bp.route('/list/<server_ip>/<int:list_id>/<color>/<list_name>')
def get_list(server_ip, list_id, color, list_name):
server_ip = common.is_ip_or_dns(server_ip)
list_name = common.checkAjaxInput(list_name)
color = common.checkAjaxInput(color)
@validate()
def get_list(server_ip: Union[IPvAnyAddress, DomainName], list_id: int, color: EscapedString, list_name: EscapedString):
try:
return runtime.show_lists(server_ip, list_id, color, list_name)
return runtime.show_lists(str(server_ip), list_id, color, list_name)
except Exception as e:
return f'{e}'

View File

@ -1,6 +1,10 @@
from typing import Union
import distro
from flask import render_template, request, g
from flask_jwt_extended import jwt_required, get_jwt
from flask_pydantic import validate
from pydantic import IPvAnyAddress
from app import cache
from app.routes.service import bp
@ -18,6 +22,7 @@ import app.modules.roxywi.common as roxywi_common
import app.modules.roxywi.overview as roxy_overview
from app.views.service.views import ServiceActionView, ServiceBackendView, ServiceView
from app.views.service.lets_encrypt_views import LetsEncryptView, LetsEncryptsView
from app.modules.roxywi.class_models import DomainName
bp.add_url_rule('/<service>/<server_id>/<any(start, stop, reload, restart):action>', view_func=ServiceActionView.as_view('service_action_ip'), methods=['GET'])
bp.add_url_rule('/<service>/<int:server_id>/<any(start, stop, reload, restart):action>', view_func=ServiceActionView.as_view('service_action'), methods=['GET'])
@ -126,13 +131,15 @@ def services(service, serv):
@bp.route('/<service>/<server_ip>/last-edit')
@check_services
def last_edit(service, server_ip):
return service_common.get_overview_last_edit(server_ip, service)
@validate()
def last_edit(service: str, server_ip: Union[IPvAnyAddress, DomainName]):
return service_common.get_overview_last_edit(str(server_ip), service)
@bp.route('/cpu-ram-metrics/<server_ip>/<server_id>/<name>/<service>')
@get_user_params()
def cpu_ram_metrics(server_ip, server_id, name, service):
@validate()
def cpu_ram_metrics(server_ip: Union[IPvAnyAddress, DomainName], server_id: int, name: str, service: str):
if service == 'haproxy':
sock_port = sql.get_setting('haproxy_sock_port')
cmd = f'echo "show info" |nc {server_ip} {sock_port} -w 1|grep -e "node\|Nbproc\|Maxco\|MB\|Nbthread"'
@ -149,7 +156,7 @@ def cpu_ram_metrics(server_ip, server_id, name, service):
else:
return_out = ''
servers = [[name, server_ip, return_out]]
servers = [[name, str(server_ip), return_out]]
claims = get_jwt()
kwargs = {
'service_status': sorted(servers, key=common.get_key),
@ -266,9 +273,9 @@ def update_tools_enable(service):
@bp.route('/check-restart/<server_ip>')
def check_restart(server_ip):
server_ip = common.is_ip_or_dns(server_ip)
servers = roxywi_common.get_dick_permit(ip=server_ip)
@validate()
def check_restart(server_ip: Union[IPvAnyAddress, DomainName]):
servers = roxywi_common.get_dick_permit(ip=str(server_ip))
for server in servers:
if server != "":
return 'ok'

View File

@ -59,7 +59,7 @@ $( function() {
let allFields = $([]).add(username_div);
allFields.removeClass("ui-state-error");
valid = valid && checkLength(username_div, "user name", 1);
let user = username_div.val()
let user = username_div.val();
if (valid) {
$.ajax({
url: "/user/ldap/" + user,
@ -70,7 +70,7 @@ $( function() {
$('#new-email').val('');
username_div.attr('readonly', false);
} else {
let json = $.parseJSON(data);
let json = $.parseJSON(data.user);
toastr.clear();
if (!user.includes('@')) {
username_div.val(user + '@' + json[1]);

View File

@ -319,7 +319,6 @@ function getTable() {
} else {
$("#ajaxtable").html(data);
$("input[type=submit], button").button();
$.getScript(script);
$.getScript(overview);
$.getScript(awesome);
}
@ -337,7 +336,6 @@ function getList() {
} else {
$("#ajaxlist").html(data);
$("input[type=submit], button").button();
$.getScript(script);
$.getScript(overview);
$.getScript(awesome);
}
@ -407,7 +405,6 @@ function getSessions() {
} else {
$("#ajaxsessions").html(data);
$("input[type=submit], button").button();
$.getScript(script);
$.getScript(overview);
$.getScript(awesome);
}

View File

@ -1,8 +1,5 @@
{% import 'languages/'+lang|default('en')+'.html' as lang %}
{% from 'include/input_macros.html' import input, select, checkbox %}
{% if user_subscription['user_status'] == 0 %}
{% include 'include/no_sub.html' %}
{% else %}
<table id="checker_telegram_table" class="overview-overflow">
<caption><i class="fab fa-telegram caption-icon"></i><h3>Telegram {{lang.words.channels}}</h3></caption>
<tr class="overviewHead" style="width: 50%;">
@ -25,7 +22,7 @@
</td>
<td>
{% set id = 'telegram-chanel-' + telegram.id|string() %}
{{ input(id, value=telegram.chanel_name, size='30') }}
{{ input(id, value=telegram.chanel_name.replace("'", ""), size='30') }}
</td>
{% if user_params['role']|int() == 1 %}
<td>
@ -77,7 +74,7 @@
</td>
<td>
{% set id = 'slack-chanel-' + slack.id|string() %}
{{ input(id, value=slack.chanel_name, size='30') }}
{{ input(id, value=slack.chanel_name.replace("'", ""), size='30') }}
</td>
{% if user_params['role']|int() == 1 %}
<td>
@ -129,7 +126,7 @@
</td>
<td>
{% set id = 'pd-chanel-' + pd.id|string() %}
{{ input(id, value=pd.chanel_name, size='30') }}
{{ input(id, value=pd.chanel_name.replace("'", ""), size='30') }}
</td>
{% if user_params['role']|int() == 1 %}
<td>
@ -157,7 +154,7 @@
</tr>
{% endfor %}
</table>
<br /><span class="add-button" title="{{lang.words.add|title()}} PagerDuty {{lang.words.channel}}" id="add-pd-button">+ {{lang.words.add|title()}}</span>
<br /><span class="add-button" title="{{lang.words.add|title()}} PagerDuty {{lang.words.channel|title()}}" id="add-pd-button">+ {{lang.words.add|title()}}</span>
<br /><br />
<table id="checker_mm_table" class="overview-overflow">
<caption><i class="fas fa-power-off caption-icon"></i><h3>Mattermost {{lang.words.channels|title()}}</h3></caption>
@ -165,7 +162,7 @@
<td class="padding10 first-collumn" style="width: 25%;">
{{lang.words.key|title()}}
</td>
<td style="width: 20%;">{{lang.words.name|title()}}</td>
<td style="width: 20%;">Webhook</td>
{% if user_params['role']|int() == 1 %}
<td style="width: 25%;">{{lang.words.group|title()}}</td>
{% endif %}
@ -181,7 +178,7 @@
</td>
<td>
{% set id = 'mm-chanel-' + mm.id|string() %}
{{ input(id, value=mm.chanel_name, size='30') }}
{{ input(id, value=mm.chanel_name.replace("'", ""), size='30') }}
</td>
{% if user_params['role']|int() == 1 %}
<td>
@ -211,7 +208,7 @@
</table>
<br /><span class="add-button" title="{{lang.words.add|title()}} Mattermost {{lang.words.channel}}" id="add-mm-button">+ {{lang.words.add|title()}}</span>
<br /><br />
<table class="overflow">
<table class="overflow_div">
<caption><i class="fas fa-envelope-open-text caption-icon"></i><h3>{{lang.words.test2|title()}} {{lang.words.message}}</h3></caption>
<tr class="overviewHead">
<td class="padding10 first-collumn" style="width: 45%">{{lang.words.email|title()}}</td>
@ -229,10 +226,8 @@
<br /><br />
<div id="ajax-telegram"></div>
<div class="add-note alert addName alert-info" style="width: inherit; margin-right: 15px;">
{{lang.phrases.read_about_parameters}} <a href="https://roxy-wi.org/description/checker" title="Servers description" target="_blank">{{lang.words.here}}</a>,
<a href="https://roxy-wi.org/howto/create-telegram-bot" title="How to create Telegram bot and use it with Roxy-WI" target="_blank">{{lang.phrases.howto_user}} Telegram bot</a>,
<a href="https://roxy-wi.org/howto/create-slack-app" title="How to create Slack APP and use it with Roxy-WI" target="_blank">{{lang.phrases.howto_user}} Slack APP</a>,
<a href="https://roxy-wi.org/howto/create-pd-integration" title="How to create PagerDuty integration and use it with Roxy-WI" target="_blank">{{lang.phrases.howto_user}} PagerDuty</a>.
<a href="https://rmon.io/howto/create-telegram-bot" title="How to create Telegram bot and use it with RMON" target="_blank">{{lang.phrases.howto_user}} Telegram bot</a>,
<a href="https://rmon.io/howto/create-slack-app" title="How to create Slack APP and use it with RMON" target="_blank">{{lang.phrases.howto_user}} Slack APP</a>,
<a href="https://rmon.io/howto/create-pd-integration" title="How to create PagerDuty integration and use it with RMON" target="_blank">{{lang.phrases.howto_user}} PagerDuty</a>.
</div>
{% endif %}
{% include 'include/admins_dialogs.html' %}

View File

@ -154,11 +154,11 @@ class UserView(MethodView):
description: The ID of the created user
"""
if g.user_params['role'] > body.role_id:
return roxywi_common.handle_json_exceptions('Wrong role', 'Cannot create user')
return roxywi_common.handler_exceptions_for_json_data(Exception('Wrong role'), 'Cannot create user')
try:
user_id = roxywi_user.create_user(body.username, body.email, body.password, body.role_id, body.enabled, body.group_id)
except Exception as e:
return roxywi_common.handle_json_exceptions(e, 'Cannot create a new user')
return roxywi_common.handler_exceptions_for_json_data(e, 'Cannot create a new user')
else:
if self.is_api:
return IdResponse(id=user_id), 201