You've already forked haproxy-wi
mirror of
https://github.com/roxy-wi/roxy-wi.git
synced 2025-12-18 12:04:07 +08:00
Removed the `check_version.html` template and moved version checking to `script.js`. This refactoring simplifies the codebase by consolidating version check handling into JavaScript, removing deprecated HTML templates, and enhancing the backend logic and API routes.
277 lines
8.0 KiB
Python
277 lines
8.0 KiB
Python
import os
|
|
import base64
|
|
from cryptography.fernet import Fernet
|
|
|
|
import paramiko
|
|
from flask import render_template
|
|
from playhouse.shortcuts import model_to_dict
|
|
|
|
import app.modules.db.cred as cred_sql
|
|
import app.modules.db.group as group_sql
|
|
import app.modules.db.server as server_sql
|
|
import app.modules.common.common as common
|
|
from app.modules.server import ssh_connection
|
|
import app.modules.roxywi.common as roxywi_common
|
|
import app.modules.roxy_wi_tools as roxy_wi_tools
|
|
from app.modules.roxywi.class_models import IdResponse, IdDataResponse
|
|
|
|
error_mess = common.error_mess
|
|
get_config = roxy_wi_tools.GetConfigVar()
|
|
|
|
|
|
def return_ssh_keys_path(server_ip: str, **kwargs) -> dict:
|
|
lib_path = get_config.get_config_var('main', 'lib_path')
|
|
ssh_settings = {}
|
|
if kwargs.get('id'):
|
|
sshs = cred_sql.select_ssh(id=kwargs.get('id'))
|
|
else:
|
|
sshs = cred_sql.select_ssh(serv=server_ip)
|
|
|
|
for ssh in sshs:
|
|
if ssh.password:
|
|
try:
|
|
password = decrypt_password(ssh.password)
|
|
except Exception as e:
|
|
raise Exception(e)
|
|
else:
|
|
password = ssh.password
|
|
if ssh.passphrase:
|
|
try:
|
|
passphrase = decrypt_password(ssh.passphrase)
|
|
except Exception as e:
|
|
raise Exception(e)
|
|
else:
|
|
passphrase = ssh.passphrase
|
|
|
|
ssh_settings.setdefault('enabled', ssh.key_enabled)
|
|
ssh_settings.setdefault('user', ssh.username)
|
|
ssh_settings.setdefault('password', password)
|
|
ssh_key = f'{lib_path}/keys/{ssh.name}.pem' if ssh.key_enabled == 1 else ''
|
|
ssh_settings.setdefault('key', ssh_key)
|
|
ssh_settings.setdefault('passphrase', passphrase)
|
|
|
|
try:
|
|
ssh_port = [str(server[10]) for server in server_sql.select_servers(server=server_ip)]
|
|
ssh_settings.setdefault('port', ssh_port[0])
|
|
except Exception as e:
|
|
raise Exception(f'error: Cannot get SSH port: {e}')
|
|
|
|
return ssh_settings
|
|
|
|
|
|
def ssh_connect(server_ip):
|
|
ssh_settings = return_ssh_keys_path(server_ip)
|
|
ssh = ssh_connection.SshConnection(server_ip, ssh_settings)
|
|
|
|
return ssh
|
|
|
|
|
|
def create_ssh_cred(name: str, password: str, group: int, username: str, enable: int, is_api: int) -> dict:
|
|
group_name = group_sql.get_group_name_by_id(group)
|
|
lang = roxywi_common.get_user_lang_for_flask()
|
|
name = f'{name}_{group_name}'
|
|
if password and password != "''":
|
|
try:
|
|
password = crypt_password(password)
|
|
except Exception as e:
|
|
raise Exception(e)
|
|
else:
|
|
password = ''
|
|
|
|
try:
|
|
last_id = cred_sql.insert_new_ssh(name, enable, group, username, password)
|
|
except Exception as e:
|
|
return roxywi_common.handle_json_exceptions(e, 'Cannot create new SSH credentials')
|
|
roxywi_common.logging('RMON server', f'New SSH credentials {name} has been created', roxywi=1, login=1)
|
|
|
|
if is_api:
|
|
return IdResponse(id=last_id).model_dump(mode='json')
|
|
else:
|
|
data = render_template('ajax/new_ssh.html', groups=group_sql.select_groups(), sshs=cred_sql.select_ssh(name=name), lang=lang)
|
|
return IdDataResponse(id=last_id, data=data).model_dump(mode='json')
|
|
|
|
|
|
def create_ssh_cred_api(name: str, enable: str, group: str, username: str, password: str) -> bool:
|
|
group_name = group_sql.get_group_name_by_id(group)
|
|
name = common.checkAjaxInput(name)
|
|
name = f'{name}_{group_name}'
|
|
enable = common.checkAjaxInput(enable)
|
|
username = common.checkAjaxInput(username)
|
|
password = common.checkAjaxInput(password)
|
|
|
|
if password != '':
|
|
try:
|
|
password = crypt_password(password)
|
|
except Exception as e:
|
|
raise Exception(e)
|
|
|
|
if username is None or name is None:
|
|
return False
|
|
else:
|
|
try:
|
|
cred_sql.insert_new_ssh(name, enable, group, username, password)
|
|
roxywi_common.logging('Roxy-WI server', f'New SSH credentials {name} has been created', roxywi=1)
|
|
return True
|
|
except Exception as e:
|
|
roxywi_common.handle_exceptions(e, 'Roxy-WI server', f'Cannot create SSH credentials {name}', roxywi=1)
|
|
|
|
|
|
def upload_ssh_key(ssh_id: int, key: str, passphrase: str) -> None:
|
|
key = key.replace("'", "")
|
|
ssh = cred_sql.get_ssh(ssh_id)
|
|
group_name = group_sql.get_group_name_by_id(ssh.group_id)
|
|
lib_path = get_config.get_config_var('main', 'lib_path')
|
|
full_dir = f'{lib_path}/keys/'
|
|
name = ssh.name
|
|
ssh_keys = f'{name}.pem'
|
|
|
|
try:
|
|
key = paramiko.pkey.load_private_key(key, password=passphrase)
|
|
except Exception as e:
|
|
raise Exception(e)
|
|
|
|
try:
|
|
_ = name.split('_')[1]
|
|
split_name = True
|
|
except Exception:
|
|
split_name = False
|
|
|
|
if not os.path.isfile(ssh_keys) and not split_name:
|
|
name = f'{ssh.name}_{group_name}'
|
|
|
|
if not os.path.exists(full_dir):
|
|
os.makedirs(full_dir)
|
|
|
|
ssh_keys = f'{full_dir}{name}.pem'
|
|
|
|
try:
|
|
key.write_private_key_file(ssh_keys)
|
|
except Exception as e:
|
|
raise Exception(e)
|
|
|
|
try:
|
|
os.chmod(ssh_keys, 0o600)
|
|
except IOError as e:
|
|
roxywi_common.logging('RMON server', e.args[0], roxywi=1)
|
|
raise Exception(e)
|
|
|
|
if passphrase:
|
|
try:
|
|
passphrase = crypt_password(passphrase)
|
|
except Exception as e:
|
|
raise Exception(e)
|
|
else:
|
|
passphrase = ''
|
|
|
|
try:
|
|
cred_sql.update_ssh_passphrase(ssh_id, passphrase)
|
|
except Exception as e:
|
|
raise Exception(e)
|
|
|
|
roxywi_common.logging("RMON server", f"A new SSH cert has been uploaded {ssh_keys}", roxywi=1, login=1)
|
|
|
|
|
|
def update_ssh_key(ssh_id: int, name: str, password: str, enable: int, username: str, group: int) -> None:
|
|
lib_path = get_config.get_config_var('main', 'lib_path')
|
|
ssh = cred_sql.get_ssh(ssh_id)
|
|
ssh_key_name = f'{lib_path}/keys/{ssh.name}.pem'
|
|
new_ssh_key_name = f'{lib_path}/keys/{name}.pem'
|
|
|
|
if password != '':
|
|
try:
|
|
password = crypt_password(password)
|
|
except Exception as e:
|
|
raise Exception(e)
|
|
|
|
if ssh.key_enabled == 1 and os.path.isfile(ssh_key_name):
|
|
os.rename(ssh_key_name, new_ssh_key_name)
|
|
os.chmod(new_ssh_key_name, 0o600)
|
|
|
|
try:
|
|
cred_sql.update_ssh(ssh_id, name, enable, group, username, password)
|
|
roxywi_common.logging('RMON server', f'The SSH credentials {name} has been updated ', roxywi=1, login=1)
|
|
except Exception as e:
|
|
raise Exception(e)
|
|
|
|
|
|
def delete_ssh_key(ssh_id) -> str:
|
|
lib_path = get_config.get_config_var('main', 'lib_path')
|
|
name = ''
|
|
ssh_enable = 0
|
|
ssh_key_name = ''
|
|
|
|
for sshs in cred_sql.select_ssh(id=ssh_id):
|
|
ssh_enable = sshs.key_enabled
|
|
name = sshs.name
|
|
ssh_key_name = f'{lib_path}/keys/{sshs.name}.pem'
|
|
|
|
if ssh_enable == 1:
|
|
try:
|
|
os.remove(ssh_key_name)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
cred_sql.delete_ssh(ssh_id)
|
|
roxywi_common.logging('Roxy-WI server', f'The SSH credentials {name} has deleted', roxywi=1, login=1)
|
|
return 'ok'
|
|
except Exception as e:
|
|
roxywi_common.handle_exceptions(e, 'Roxy-WI server', f'Cannot delete SSH credentials {name}', roxywi=1, login=1)
|
|
|
|
|
|
def crypt_password(password: str) -> bytes:
|
|
"""
|
|
Crypt password
|
|
:param password: plain password
|
|
:return: crypted text
|
|
"""
|
|
salt = get_config.get_config_var('main', 'secret_phrase')
|
|
fernet = Fernet(salt.encode())
|
|
try:
|
|
crypted_pass = fernet.encrypt(password.encode())
|
|
except Exception as e:
|
|
raise Exception(f'error: Cannot crypt password: {e}')
|
|
return crypted_pass
|
|
|
|
|
|
def decrypt_password(password: str) -> str:
|
|
"""
|
|
Decrypt password
|
|
:param password: crypted password
|
|
:return: plain text
|
|
"""
|
|
salt = get_config.get_config_var('main', 'secret_phrase')
|
|
fernet = Fernet(salt.encode())
|
|
try:
|
|
decryp_pass = fernet.decrypt(password.encode()).decode()
|
|
except Exception as e:
|
|
raise Exception(f'error: Cannot decrypt password: {e}')
|
|
return decryp_pass
|
|
|
|
|
|
def get_creds(group_id: int = None, cred_id: int = None) -> list:
|
|
json_data = []
|
|
lib_path = get_config.get_config_var('main', 'lib_path')
|
|
|
|
if group_id and cred_id:
|
|
creds = cred_sql.select_ssh(group=group_id, cred_id=cred_id)
|
|
elif group_id:
|
|
creds = cred_sql.select_ssh(group=group_id)
|
|
else:
|
|
creds = cred_sql.select_ssh()
|
|
|
|
for cred in creds:
|
|
cred_dict = model_to_dict(cred)
|
|
cred_dict['name'] = cred_dict['name'].replace("'", "")
|
|
ssh_key_file = f'{lib_path}/keys/{cred_dict["name"]}.pem'
|
|
if os.path.isfile(ssh_key_file):
|
|
with open(ssh_key_file, 'rb') as key:
|
|
cred_dict['private_key'] = base64.b64encode(key.read()).decode('utf-8')
|
|
else:
|
|
cred_dict['private_key'] = ''
|
|
if cred_dict['password']:
|
|
cred_dict['password'] = decrypt_password(cred_dict['password'])
|
|
if cred_dict['passphrase']:
|
|
cred_dict['passphrase'] = decrypt_password(cred_dict['passphrase'])
|
|
json_data.append(cred_dict)
|
|
return json_data
|