Files
haproxy-wi/app/modules/db/user.py
Aidaho c56d105ef5 v7.2.5.0: Refactor execution of group changes in user settings
This commit streamlines the way user group data is handled, both in scripts and server-side logic. The group settings URL has been simplified and the logic for updating user groups also has been optimized. Eliminated unnecessary checks in the user.py module, and updated the routing logic for group data to utilize a single route with HTTP GET and PUT methods. Removed unsuccessful group changing errors.
2024-05-15 21:35:38 +03:00

448 lines
12 KiB
Python

from peewee import Case, JOIN
from app.modules.db.db_model import User, UserGroups, Groups, UUID, Token, ApiToken
from app.modules.db.sql import get_setting
from app.modules.db.common import out_error
import app.modules.roxy_wi_tools as roxy_wi_tools
def add_user(user, email, password, role, activeuser, group):
if password != 'aduser':
try:
hashed_pass = roxy_wi_tools.Tools.get_hash(password)
last_id = User.insert(
username=user, email=email, password=hashed_pass, role=role, activeuser=activeuser, groups=group
).execute()
except Exception as e:
out_error(e)
else:
return last_id
else:
try:
last_id = User.insert(
username=user, email=email, role=role, ldap_user=1, activeuser=activeuser, groups=group
).execute()
except Exception as e:
out_error(e)
else:
return last_id
def update_user(user, email, role, user_id, active_user):
try:
User.update(username=user, email=email, role=role, activeuser=active_user).where(User.user_id == user_id).execute()
except Exception as e:
out_error(e)
def update_user_from_admin_area(user, email, user_id, active_user):
try:
User.update(username=user, email=email, activeuser=active_user).where(User.user_id == user_id).execute()
except Exception as e:
out_error(e)
def delete_user_groups(user_id):
group_for_delete = UserGroups.delete().where(UserGroups.user_id == user_id)
try:
group_for_delete.execute()
except Exception as e:
out_error(e)
return False
else:
return True
def update_user_current_groups(groups, user_uuid):
user_id = get_user_id_by_uuid(user_uuid)
try:
User.update(groups=groups).where(User.user_id == user_id).execute()
except Exception as e:
out_error(e)
def update_user_current_groups_by_id(groups, user_id):
try:
user_update = User.update(groups=groups).where(User.user_id == user_id)
user_update.execute()
except Exception as e:
out_error(e)
def update_user_password(password, user_id):
try:
hashed_pass = roxy_wi_tools.Tools.get_hash(password)
user_update = User.update(password=hashed_pass).where(User.user_id == user_id)
user_update.execute()
except Exception as e:
out_error(e)
def delete_user(user_id):
try:
user_for_delete = User.delete().where(User.user_id == user_id)
user_for_delete.execute()
delete_user_groups(user_id)
except Exception as e:
out_error(e)
return False
else:
return True
def update_user_role(user_id: int, group_id: int, role_id: int) -> None:
try:
UserGroups.insert(user_id=user_id, user_group_id=group_id, user_role_id=role_id).on_conflict('replace').execute()
except Exception as e:
out_error(e)
def select_users(**kwargs):
if kwargs.get("user") is not None:
query = User.select().where(User.username == kwargs.get("user"))
elif kwargs.get("id") is not None:
query = User.select().where(User.user_id == kwargs.get("id"))
elif kwargs.get("group") is not None:
get_date = roxy_wi_tools.GetDate(get_setting('time_zone'))
cur_date = get_date.return_date('regular', timedelta_minutes_minus=15)
query = (User.select(
User, UserGroups, Case(
0, [((User.last_login_date >= cur_date), 0)], 1
).alias('last_login')
).join(UserGroups, on=(User.user_id == UserGroups.user_id)).where(
UserGroups.user_group_id == kwargs.get("group")
))
elif kwargs.get('by_group_id'):
query = User.select().where(User.groups == kwargs.get("by_group_id"))
else:
get_date = roxy_wi_tools.GetDate(get_setting('time_zone'))
cur_date = get_date.return_date('regular', timedelta_minutes_minus=15)
query = User.select(User, Case(0, [(
(User.last_login_date >= cur_date), 0)], 1).alias('last_login')).order_by(User.user_id)
try:
query_res = query.execute()
except Exception as e:
out_error(e)
else:
return query_res
def is_user_active(user_id: int) -> int:
try:
query = User.get(User.user_id == user_id).activeuser
except Exception as e:
out_error(e)
else:
return int(query)
def check_user_group(user_id, group_id):
try:
query_res = UserGroups.get((UserGroups.user_id == user_id) & (UserGroups.user_group_id == group_id))
except Exception:
return False
else:
if query_res.user_id != '':
return True
else:
return False
def select_user_groups_with_names(user_id, **kwargs):
if kwargs.get("all") is not None:
query = (UserGroups.select(
UserGroups.user_group_id, UserGroups.user_id, Groups.name, Groups.description
).join(Groups, on=(UserGroups.user_group_id == Groups.group_id)))
elif kwargs.get("user_not_in_group") is not None:
query = (Groups.select(
Groups.group_id, Groups.name
).join(UserGroups, on=(
(UserGroups.user_group_id == Groups.group_id) &
(UserGroups.user_id == user_id)
), join_type=JOIN.LEFT_OUTER).group_by(Groups.name).where(UserGroups.user_id.is_null(True)))
else:
query = (UserGroups.select(
UserGroups.user_group_id, UserGroups.user_role_id, Groups.name, Groups.group_id
).join(Groups, on=(UserGroups.user_group_id == Groups.group_id)).where(UserGroups.user_id == user_id))
try:
query_res = query.execute()
except Exception as e:
out_error(e)
else:
return query_res
def select_user_roles_by_group(group_id: int):
try:
query_res = UserGroups.select().where(UserGroups.user_group_id == group_id).execute()
except Exception as e:
out_error(e)
else:
return query_res
def select_users_roles():
try:
query_res = UserGroups.select().execute()
except Exception as e:
out_error(e)
else:
return query_res
def update_last_act_user(uuid: str, token: str, ip: str) -> None:
get_date = roxy_wi_tools.GetDate(get_setting('time_zone'))
session_ttl = get_setting('session_ttl')
token_ttl = get_setting('token_ttl')
cur_date_session = get_date.return_date('regular', timedelta=session_ttl)
cur_date_token = get_date.return_date('regular', timedelta=token_ttl)
cur_date = get_date.return_date('regular')
user_id = get_user_id_by_uuid(uuid)
query = UUID.update(exp=cur_date_session).where(UUID.uuid == uuid)
query1 = Token.update(exp=cur_date_token).where(Token.token == token)
query2 = User.update(last_login_date=cur_date, last_login_ip=ip).where(User.user_id == user_id)
try:
query.execute()
query1.execute()
query2.execute()
except Exception as e:
out_error(e)
def get_user_name_by_uuid(uuid):
try:
query = User.select(User.username).join(UUID, on=(User.user_id == UUID.user_id)).where(UUID.uuid == uuid)
query_res = query.execute()
except Exception as e:
out_error(e)
else:
for user in query_res:
return user.username
def get_user_id_by_uuid(uuid):
try:
query = User.select(User.user_id).join(UUID, on=(User.user_id == UUID.user_id)).where(UUID.uuid == uuid)
query_res = query.execute()
except Exception as e:
out_error(e)
else:
for user in query_res:
return user.user_id
def get_user_id_by_username(username: str):
try:
query = User.get(User.username == username).user_id
except Exception as e:
out_error(e)
else:
return query
def get_user_role_by_uuid(uuid, group_id):
try:
query_res = UserGroups.select(UserGroups.user_role_id).join(
UUID, on=(UserGroups.user_id == UUID.user_id)
).where(
(UUID.uuid == uuid) &
(UserGroups.user_group_id == group_id)
).execute()
except Exception as e:
out_error(e)
else:
for user_id in query_res:
return int(user_id.user_role_id)
def get_user_current_group_by_uuid(uuid):
try:
query_res = User.select(User.groups).join(
UUID, on=(User.user_id == UUID.user_id)
).where(
(UUID.uuid == uuid)
).execute()
except Exception as e:
out_error(e)
else:
for user_id in query_res:
return int(user_id.groups)
def write_user_uuid(login, user_uuid):
session_ttl = get_setting('session_ttl')
user_id = get_user_id_by_username(login)
get_date = roxy_wi_tools.GetDate()
cur_date = get_date.return_date('regular', timedelta=session_ttl)
try:
UUID.insert(user_id=user_id, uuid=user_uuid, exp=cur_date).execute()
except Exception as e:
out_error(e)
def write_user_token(login, user_token):
token_ttl = get_setting('token_ttl')
user_id = get_user_id_by_username(login)
get_date = roxy_wi_tools.GetDate()
cur_date = get_date.return_date('regular', timedelta=token_ttl)
try:
Token.insert(user_id=user_id, token=user_token, exp=cur_date).execute()
except Exception as e:
out_error(e)
def select_user_services(user_id):
try:
query_res = User.get(User.user_id == user_id).user_services
except Exception as e:
out_error(e)
else:
return query_res
def update_user_services(services, user_id):
try:
User.update(user_services=services).where(User.user_id == user_id).execute()
except Exception as e:
out_error(e)
return False
else:
return True
def get_super_admin_count() -> int:
query = UserGroups.select(UserGroups.user_id, UserGroups.user_role_id).distinct().where(UserGroups.user_role_id == 1).group_by(UserGroups.user_id)
try:
query_res = query.execute()
except Exception as e:
out_error(e)
else:
try:
return len(list(query_res))
except Exception as e:
raise Exception(f'error: {e}')
def select_users_emails_by_group_id(group_id: int):
query = User.select(User.email).where((User.groups == group_id) & (User.role != 'guest'))
try:
query_res = query.execute()
except Exception as e:
out_error(e)
return
else:
return query_res
def select_user_email_by_uuid(uuid: str) -> str:
user_id = get_user_id_by_uuid(uuid)
try:
query_res = User.get(User.user_id == user_id).email
except Exception as e:
out_error(e)
return ""
else:
return query_res
def is_user_super_admin(user_id: int) -> bool:
query = UserGroups.select().where(UserGroups.user_id == user_id)
try:
query_res = query.execute()
except Exception as e:
out_error(e)
else:
for i in query_res:
if i.user_role_id == 1:
return True
else:
return False
def get_api_token(token):
try:
user_token = ApiToken.get(ApiToken.token == token)
except Exception as e:
return str(e)
else:
return True if token == user_token.token else False
def get_user_id_by_api_token(token):
query = (User.select(User.user_id).join(ApiToken, on=(
ApiToken.user_name == User.username
)).where(ApiToken.token == token))
try:
query_res = query.execute()
except Exception as e:
return str(e)
for i in query_res:
return i.user_id
def write_api_token(user_token, group_id, user_role, user_name):
token_ttl = int(get_setting('token_ttl'))
get_date = roxy_wi_tools.GetDate()
cur_date = get_date.return_date('regular', timedelta=token_ttl)
cur_date_token_ttl = get_date.return_date('regular', timedelta=token_ttl)
try:
ApiToken.insert(
token=user_token, user_name=user_name, user_group_id=group_id, user_role=user_role,
create_date=cur_date, expire_date=cur_date_token_ttl).execute()
except Exception as e:
out_error(e)
def get_username_group_id_from_api_token(token):
try:
user_name = ApiToken.get(ApiToken.token == token)
except Exception as e:
return str(e)
else:
return user_name.user_name, user_name.user_group_id, user_name.user_role
def get_token(uuid):
query = Token.select().join(UUID, on=(Token.user_id == UUID.user_id)).where(UUID.uuid == uuid).limit(1)
try:
query_res = query.execute()
except Exception as e:
out_error(e)
else:
try:
for i in query_res:
return i.token
except Exception:
return ''
def delete_old_uuid():
get_date = roxy_wi_tools.GetDate()
cur_date = get_date.return_date('regular')
query = UUID.delete().where((UUID.exp < cur_date) | (UUID.exp.is_null(True)))
query1 = Token.delete().where((Token.exp < cur_date) | (Token.exp.is_null(True)))
try:
query.execute()
query1.execute()
except Exception as e:
out_error(e)
def get_role_id(user_id: int, group_id: int) -> int:
try:
role_id = UserGroups.get((UserGroups.user_id == user_id) & (UserGroups.user_group_id == group_id))
except Exception as e:
out_error(e)
else:
return int(role_id.user_role_id)
def get_user_id(user_id: int) -> int:
try:
return User.get(User.user_id == user_id)
except Exception as e:
out_error(e)