mirror of https://github.com/Aidaho12/haproxy-wi
v8.1.8: Refactor JWT claims retrieval into a reusable function
Replaced repeated JWT validation and claim extraction code with a centralized `get_jwt_token_claims` function. This improves code maintainability, reduces redundancy, and ensures consistent JWT handling across the application.pull/418/head
parent
fc5d4f72a1
commit
0909fe8022
|
@ -2,8 +2,6 @@ from typing import Union
|
|||
|
||||
from flask import request, abort, url_for, jsonify
|
||||
from flask_jwt_extended import create_access_token, set_access_cookies
|
||||
from flask_jwt_extended import get_jwt
|
||||
from flask_jwt_extended import verify_jwt_in_request
|
||||
|
||||
import app.modules.db.sql as sql
|
||||
import app.modules.db.user as user_sql
|
||||
|
@ -29,8 +27,7 @@ def check_login(user_id: int) -> Union[str, None]:
|
|||
|
||||
def is_access_permit_to_service(service: str) -> bool:
|
||||
service_id = service_sql.select_service_id_by_slug(service)
|
||||
verify_jwt_in_request()
|
||||
claims = get_jwt()
|
||||
claims = roxywi_common.get_jwt_token_claims()
|
||||
user_services = user_sql.select_user_services(claims['user_id'])
|
||||
if str(service_id) in user_services:
|
||||
return True
|
||||
|
@ -42,8 +39,7 @@ def is_admin(level=1, **kwargs):
|
|||
if kwargs.get('role_id'):
|
||||
role = kwargs.get('role_id')
|
||||
else:
|
||||
verify_jwt_in_request()
|
||||
claims = get_jwt()
|
||||
claims = roxywi_common.get_jwt_token_claims()
|
||||
user_id = claims['user_id']
|
||||
group_id = claims['group']
|
||||
|
||||
|
|
|
@ -22,10 +22,16 @@ from app.modules.roxywi.exception import RoxywiResourceNotFound, RoxywiGroupMism
|
|||
get_config_var = roxy_wi_tools.GetConfigVar()
|
||||
|
||||
|
||||
def get_jwt_token_claims() -> dict:
|
||||
verify_jwt_in_request()
|
||||
claims = get_jwt()
|
||||
claim = {'user_id': claims['user_id'], 'group': claims['group']}
|
||||
return claim
|
||||
|
||||
|
||||
def get_user_group(**kwargs) -> int:
|
||||
try:
|
||||
verify_jwt_in_request()
|
||||
claims = get_jwt()
|
||||
claims = get_jwt_token_claims()
|
||||
user_group_id = claims['group']
|
||||
group = group_sql.get_group(user_group_id)
|
||||
if group.group_id == int(user_group_id):
|
||||
|
@ -43,8 +49,7 @@ def get_user_group(**kwargs) -> int:
|
|||
def check_user_group_for_flask(api_token: bool = False):
|
||||
if api_token:
|
||||
return True
|
||||
verify_jwt_in_request()
|
||||
claims = get_jwt()
|
||||
claims = get_jwt_token_claims()
|
||||
user_id = claims['user_id']
|
||||
group_id = claims['group']
|
||||
|
||||
|
@ -117,8 +122,7 @@ def logging(server_ip: Union[str, int], action: str, **kwargs) -> None:
|
|||
setup_logger(log_file)
|
||||
|
||||
# JWT validation and extracting user's information
|
||||
verify_jwt_in_request()
|
||||
claims = get_jwt()
|
||||
claims = get_jwt_token_claims()
|
||||
user_id = claims['user_id']
|
||||
user = user_sql.get_user_id(user_id=user_id)
|
||||
user_group = get_user_group()
|
||||
|
@ -200,8 +204,7 @@ def get_dick_permit(**kwargs):
|
|||
|
||||
|
||||
def get_users_params(**kwargs):
|
||||
verify_jwt_in_request()
|
||||
user_data = get_jwt()
|
||||
user_data = get_jwt_token_claims()
|
||||
|
||||
try:
|
||||
user_id = user_data['user_id']
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
import psutil
|
||||
import requests
|
||||
from flask import render_template, request
|
||||
from flask_jwt_extended import get_jwt
|
||||
from flask_jwt_extended import verify_jwt_in_request
|
||||
|
||||
import app.modules.db.sql as sql
|
||||
import app.modules.db.waf as waf_sql
|
||||
|
@ -41,8 +39,7 @@ def show_sub_ovw() -> str:
|
|||
|
||||
def show_overview(serv) -> str:
|
||||
servers = []
|
||||
verify_jwt_in_request()
|
||||
claims = get_jwt()
|
||||
claims = roxywi_common.get_jwt_token_claims()
|
||||
lang = roxywi_common.get_user_lang_for_flask()
|
||||
role = user_sql.get_user_role_in_group(claims['user_id'], claims['group'])
|
||||
server = server_sql.get_server_by_ip(serv)
|
||||
|
|
|
@ -2,8 +2,6 @@ from typing import Union
|
|||
|
||||
import requests
|
||||
from flask import render_template
|
||||
from flask_jwt_extended import get_jwt
|
||||
from flask_jwt_extended import verify_jwt_in_request
|
||||
|
||||
import app.modules.db.sql as sql
|
||||
import app.modules.db.user as user_sql
|
||||
|
@ -45,8 +43,7 @@ def is_protected(server_ip: str, action: str) -> None:
|
|||
:return: None
|
||||
:raises: Exception if the server is protected and the user role is not high enough.
|
||||
"""
|
||||
verify_jwt_in_request()
|
||||
claims = get_jwt()
|
||||
claims = roxywi_common.get_jwt_token_claims()
|
||||
user_role = user_sql.get_user_role_in_group(claims['user_id'], claims['group'])
|
||||
|
||||
if server_sql.is_serv_protected(server_ip) and int(user_role) > 2:
|
||||
|
|
Loading…
Reference in New Issue