You've already forked haproxy-wi
mirror of
https://github.com/roxy-wi/roxy-wi.git
synced 2025-12-18 12:04:07 +08:00
@@ -13,14 +13,13 @@ import app.modules.common.common as common
|
||||
from app.modules.server import ssh_connection
|
||||
import app.modules.roxywi.common as roxywi_common
|
||||
import app.modules.roxy_wi_tools as roxy_wi_tools
|
||||
from app.modules.roxywi.class_models import IdResponse, IdDataResponse
|
||||
from app.modules.roxywi.class_models import IdResponse, IdDataResponse, CredRequest
|
||||
|
||||
error_mess = common.error_mess
|
||||
get_config = roxy_wi_tools.GetConfigVar()
|
||||
|
||||
|
||||
def return_ssh_keys_path(server_ip: str, **kwargs) -> dict:
|
||||
lib_path = get_config.get_config_var('main', 'lib_path')
|
||||
ssh_settings = {}
|
||||
if kwargs.get('id'):
|
||||
sshs = cred_sql.select_ssh(id=kwargs.get('id'))
|
||||
@@ -43,10 +42,10 @@ def return_ssh_keys_path(server_ip: str, **kwargs) -> dict:
|
||||
else:
|
||||
passphrase = ssh.passphrase
|
||||
|
||||
ssh_key = _return_correct_ssh_file(ssh)
|
||||
ssh_settings.setdefault('enabled', ssh.key_enabled)
|
||||
ssh_settings.setdefault('user', ssh.username)
|
||||
ssh_settings.setdefault('password', password)
|
||||
ssh_key = f'{lib_path}/keys/{ssh.name}.pem' if ssh.key_enabled == 1 else ''
|
||||
ssh_settings.setdefault('key', ssh_key)
|
||||
ssh_settings.setdefault('passphrase', passphrase)
|
||||
|
||||
@@ -66,10 +65,8 @@ def ssh_connect(server_ip):
|
||||
return ssh
|
||||
|
||||
|
||||
def create_ssh_cred(name: str, password: str, group: int, username: str, enable: int, is_api: int) -> dict:
|
||||
group_name = group_sql.get_group_name_by_id(group)
|
||||
def create_ssh_cred(name: str, password: str, group: int, username: str, enable: int, is_api: int, shared: int) -> dict:
|
||||
lang = roxywi_common.get_user_lang_for_flask()
|
||||
name = f'{name}_{group_name}'
|
||||
if password and password != "''":
|
||||
try:
|
||||
password = crypt_password(password)
|
||||
@@ -79,43 +76,20 @@ def create_ssh_cred(name: str, password: str, group: int, username: str, enable:
|
||||
password = ''
|
||||
|
||||
try:
|
||||
last_id = cred_sql.insert_new_ssh(name, enable, group, username, password)
|
||||
last_id = cred_sql.insert_new_ssh(name, enable, group, username, password, shared)
|
||||
except Exception as e:
|
||||
return roxywi_common.handle_json_exceptions(e, 'Cannot create new SSH credentials')
|
||||
roxywi_common.logging('RMON server', f'New SSH credentials {name} has been created', roxywi=1, login=1)
|
||||
roxywi_common.logging('Roxy-WI server', f'New SSH credentials {name} has been created', roxywi=1, login=1)
|
||||
|
||||
if is_api:
|
||||
return IdResponse(id=last_id).model_dump(mode='json')
|
||||
else:
|
||||
data = render_template('ajax/new_ssh.html', groups=group_sql.select_groups(), sshs=cred_sql.select_ssh(name=name), lang=lang)
|
||||
data = render_template('ajax/new_ssh.html',
|
||||
groups=group_sql.select_groups(), sshs=cred_sql.select_ssh(name=name), lang=lang, adding=1
|
||||
)
|
||||
return IdDataResponse(id=last_id, data=data).model_dump(mode='json')
|
||||
|
||||
|
||||
def create_ssh_cred_api(name: str, enable: str, group: str, username: str, password: str) -> bool:
|
||||
group_name = group_sql.get_group_name_by_id(group)
|
||||
name = common.checkAjaxInput(name)
|
||||
name = f'{name}_{group_name}'
|
||||
enable = common.checkAjaxInput(enable)
|
||||
username = common.checkAjaxInput(username)
|
||||
password = common.checkAjaxInput(password)
|
||||
|
||||
if password != '':
|
||||
try:
|
||||
password = crypt_password(password)
|
||||
except Exception as e:
|
||||
raise Exception(e)
|
||||
|
||||
if username is None or name is None:
|
||||
return False
|
||||
else:
|
||||
try:
|
||||
cred_sql.insert_new_ssh(name, enable, group, username, password)
|
||||
roxywi_common.logging('Roxy-WI server', f'New SSH credentials {name} has been created', roxywi=1)
|
||||
return True
|
||||
except Exception as e:
|
||||
roxywi_common.handle_exceptions(e, 'Roxy-WI server', f'Cannot create SSH credentials {name}', roxywi=1)
|
||||
|
||||
|
||||
def upload_ssh_key(ssh_id: int, key: str, passphrase: str) -> None:
|
||||
key = key.replace("'", "")
|
||||
ssh = cred_sql.get_ssh(ssh_id)
|
||||
@@ -123,37 +97,23 @@ def upload_ssh_key(ssh_id: int, key: str, passphrase: str) -> None:
|
||||
lib_path = get_config.get_config_var('main', 'lib_path')
|
||||
full_dir = f'{lib_path}/keys/'
|
||||
name = ssh.name
|
||||
ssh_keys = f'{name}.pem'
|
||||
ssh_keys = f'{full_dir}{name}_{group_name}.pem'
|
||||
|
||||
try:
|
||||
key = paramiko.pkey.load_private_key(key, password=passphrase)
|
||||
except Exception as e:
|
||||
raise Exception(e)
|
||||
if key != '':
|
||||
try:
|
||||
key = paramiko.pkey.load_private_key(key, password=passphrase)
|
||||
except Exception as e:
|
||||
raise Exception(e)
|
||||
|
||||
try:
|
||||
_ = name.split('_')[1]
|
||||
split_name = True
|
||||
except Exception:
|
||||
split_name = False
|
||||
try:
|
||||
key.write_private_key_file(ssh_keys)
|
||||
except Exception as e:
|
||||
raise Exception(e)
|
||||
|
||||
if not os.path.isfile(ssh_keys) and not split_name:
|
||||
name = f'{ssh.name}_{group_name}'
|
||||
|
||||
if not os.path.exists(full_dir):
|
||||
os.makedirs(full_dir)
|
||||
|
||||
ssh_keys = f'{full_dir}{name}.pem'
|
||||
|
||||
try:
|
||||
key.write_private_key_file(ssh_keys)
|
||||
except Exception as e:
|
||||
raise Exception(e)
|
||||
|
||||
try:
|
||||
os.chmod(ssh_keys, 0o600)
|
||||
except IOError as e:
|
||||
roxywi_common.logging('RMON server', e.args[0], roxywi=1)
|
||||
raise Exception(e)
|
||||
try:
|
||||
os.chmod(ssh_keys, 0o600)
|
||||
except IOError as e:
|
||||
raise Exception(e)
|
||||
|
||||
if passphrase:
|
||||
try:
|
||||
@@ -168,54 +128,48 @@ def upload_ssh_key(ssh_id: int, key: str, passphrase: str) -> None:
|
||||
except Exception as e:
|
||||
raise Exception(e)
|
||||
|
||||
roxywi_common.logging("RMON server", f"A new SSH cert has been uploaded {ssh_keys}", roxywi=1, login=1)
|
||||
roxywi_common.logging("Roxy-WI server", f"A new SSH cert has been uploaded {ssh_keys}", roxywi=1, login=1)
|
||||
|
||||
|
||||
def update_ssh_key(ssh_id: int, name: str, password: str, enable: int, username: str, group: int) -> None:
|
||||
lib_path = get_config.get_config_var('main', 'lib_path')
|
||||
def update_ssh_key(body: CredRequest, group_id: int, ssh_id: int) -> None:
|
||||
ssh = cred_sql.get_ssh(ssh_id)
|
||||
ssh_key_name = f'{lib_path}/keys/{ssh.name}.pem'
|
||||
new_ssh_key_name = f'{lib_path}/keys/{name}.pem'
|
||||
ssh_key_name = _return_correct_ssh_file(ssh)
|
||||
|
||||
if password != '':
|
||||
if body.password != '' and body.password is not None:
|
||||
try:
|
||||
password = crypt_password(password)
|
||||
body.password = crypt_password(body.password)
|
||||
except Exception as e:
|
||||
raise Exception(e)
|
||||
|
||||
if ssh.key_enabled == 1 and os.path.isfile(ssh_key_name):
|
||||
if os.path.isfile(ssh_key_name):
|
||||
new_ssh_key_name = _return_correct_ssh_file(body)
|
||||
os.rename(ssh_key_name, new_ssh_key_name)
|
||||
os.chmod(new_ssh_key_name, 0o600)
|
||||
|
||||
try:
|
||||
cred_sql.update_ssh(ssh_id, name, enable, group, username, password)
|
||||
roxywi_common.logging('RMON server', f'The SSH credentials {name} has been updated ', roxywi=1, login=1)
|
||||
cred_sql.update_ssh(ssh_id, body.name, body.key_enabled, group_id, body.username, body.password, body.shared)
|
||||
roxywi_common.logging('Roxy-WI server', f'The SSH credentials {body.name} has been updated ', roxywi=1, login=1)
|
||||
except Exception as e:
|
||||
raise Exception(e)
|
||||
|
||||
|
||||
def delete_ssh_key(ssh_id) -> str:
|
||||
lib_path = get_config.get_config_var('main', 'lib_path')
|
||||
def delete_ssh_key(ssh_id) -> None:
|
||||
name = ''
|
||||
ssh_enable = 0
|
||||
ssh_key_name = ''
|
||||
|
||||
for sshs in cred_sql.select_ssh(id=ssh_id):
|
||||
ssh_enable = sshs.key_enabled
|
||||
name = sshs.name
|
||||
ssh_key_name = f'{lib_path}/keys/{sshs.name}.pem'
|
||||
|
||||
if ssh_enable == 1:
|
||||
try:
|
||||
os.remove(ssh_key_name)
|
||||
except Exception:
|
||||
pass
|
||||
if sshs.key_enabled == 1:
|
||||
ssh_key_name = _return_correct_ssh_file(sshs)
|
||||
try:
|
||||
os.remove(ssh_key_name)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
cred_sql.delete_ssh(ssh_id)
|
||||
roxywi_common.logging('Roxy-WI server', f'The SSH credentials {name} has deleted', roxywi=1, login=1)
|
||||
return 'ok'
|
||||
except Exception as e:
|
||||
roxywi_common.handle_exceptions(e, 'Roxy-WI server', f'Cannot delete SSH credentials {name}', roxywi=1, login=1)
|
||||
raise e
|
||||
|
||||
|
||||
def crypt_password(password: str) -> bytes:
|
||||
@@ -248,12 +202,11 @@ def decrypt_password(password: str) -> str:
|
||||
return decryp_pass
|
||||
|
||||
|
||||
def get_creds(group_id: int = None, cred_id: int = None) -> list:
|
||||
def get_creds(group_id: int = None, cred_id: int = None, not_shared: bool = False) -> list:
|
||||
json_data = []
|
||||
lib_path = get_config.get_config_var('main', 'lib_path')
|
||||
|
||||
if group_id and cred_id:
|
||||
creds = cred_sql.select_ssh(group=group_id, cred_id=cred_id)
|
||||
creds = cred_sql.select_ssh(group=group_id, cred_id=cred_id, not_shared=not_shared)
|
||||
elif group_id:
|
||||
creds = cred_sql.select_ssh(group=group_id)
|
||||
else:
|
||||
@@ -262,15 +215,33 @@ def get_creds(group_id: int = None, cred_id: int = None) -> list:
|
||||
for cred in creds:
|
||||
cred_dict = model_to_dict(cred)
|
||||
cred_dict['name'] = cred_dict['name'].replace("'", "")
|
||||
ssh_key_file = f'{lib_path}/keys/{cred_dict["name"]}.pem'
|
||||
if os.path.isfile(ssh_key_file):
|
||||
with open(ssh_key_file, 'rb') as key:
|
||||
cred_dict['private_key'] = base64.b64encode(key.read()).decode('utf-8')
|
||||
|
||||
if cred.key_enabled == 1:
|
||||
ssh_key_file = _return_correct_ssh_file(cred)
|
||||
if os.path.isfile(ssh_key_file):
|
||||
with open(ssh_key_file, 'rb') as key:
|
||||
cred_dict['private_key'] = base64.b64encode(key.read()).decode('utf-8')
|
||||
else:
|
||||
cred_dict['private_key'] = ''
|
||||
else:
|
||||
cred_dict['private_key'] = ''
|
||||
if cred_dict['password']:
|
||||
cred_dict['password'] = decrypt_password(cred_dict['password'])
|
||||
try:
|
||||
cred_dict['password'] = decrypt_password(cred_dict['password'])
|
||||
except Exception:
|
||||
pass
|
||||
if cred_dict['passphrase']:
|
||||
cred_dict['passphrase'] = decrypt_password(cred_dict['passphrase'])
|
||||
json_data.append(cred_dict)
|
||||
return json_data
|
||||
|
||||
|
||||
def _return_correct_ssh_file(cred: CredRequest) -> str:
|
||||
lib_path = get_config.get_config_var('main', 'lib_path')
|
||||
group_name = group_sql.get_group_name_by_id(cred.group_id)
|
||||
# if not cred.key_enabled:
|
||||
# return ''
|
||||
if group_name not in cred.name:
|
||||
return f'{lib_path}/keys/{cred.name}_{group_name}.pem'
|
||||
else:
|
||||
return f'{lib_path}/keys/{cred.name}.pem'
|
||||
|
||||
Reference in New Issue
Block a user