v8.1.0.1: Refactor SSH permission checks and streamline database connection

Implement additional checks for shared SSH credential editing and deletion. Refactored database connection logic to use keyword arguments for improved readability and maintainability.
pull/399/head v8.1.0.1
Aidaho 2024-10-21 09:35:25 +03:00
parent 2fa880578a
commit 98fb3fb288
3 changed files with 46 additions and 10 deletions

View File

@ -21,12 +21,14 @@ class ReconnectMySQLDatabase(ReconnectMixin, MySQLDatabase):
def connect(get_migrator=None): def connect(get_migrator=None):
if mysql_enable == '1': if mysql_enable == '1':
mysql_user = get_config.get_config_var('mysql', 'mysql_user')
mysql_password = get_config.get_config_var('mysql', 'mysql_password')
mysql_db = get_config.get_config_var('mysql', 'mysql_db') mysql_db = get_config.get_config_var('mysql', 'mysql_db')
mysql_host = get_config.get_config_var('mysql', 'mysql_host') kwargs = {
mysql_port = get_config.get_config_var('mysql', 'mysql_port') "user": get_config.get_config_var('mysql', 'mysql_user'),
conn = ReconnectMySQLDatabase(mysql_db, user=mysql_user, password=mysql_password, host=mysql_host, port=int(mysql_port)) "password": get_config.get_config_var('mysql', 'mysql_password'),
"host": get_config.get_config_var('mysql', 'mysql_host'),
"port": int(get_config.get_config_var('mysql', 'mysql_port'))
}
conn = ReconnectMySQLDatabase(mysql_db, **kwargs)
migrator = MySQLMigrator(conn) migrator = MySQLMigrator(conn)
else: else:
db = "/var/lib/roxy-wi/roxy-wi.db" db = "/var/lib/roxy-wi/roxy-wi.db"

View File

@ -45,7 +45,11 @@
<br> <br>
</td> </td>
<td> <td>
{% if ssh.shared and g.user_params['group_id']|string() != ssh.group_id|string() %}
<!-- continue -->
{% else %}
<a class="delete" onclick="confirmDeleteSsh({{ssh.id}})" title="{{lang.words.delete|title()}} {{ssh.name}}" style="cursor: pointer;"></a> <a class="delete" onclick="confirmDeleteSsh({{ssh.id}})" title="{{lang.words.delete|title()}} {{ssh.name}}" style="cursor: pointer;"></a>
{% endif %}
</td> </td>
</tr> </tr>
{% if ssh.shared and g.user_params['group_id']|string() != ssh.group_id|string() %} {% if ssh.shared and g.user_params['group_id']|string() != ssh.group_id|string() %}

View File

@ -172,21 +172,25 @@ class CredView(MethodView):
description: Credential update successful description: Credential update successful
""" """
group_id = SupportClass.return_group_id(body) group_id = SupportClass.return_group_id(body)
ssh = self._get_ssh(cred_id)
if ssh.shared and g.user_params['role'] != 1 and int(group_id) != int(ssh.group_id):
return roxywi_common.handler_exceptions_for_json_data(RoxywiPermissionError(), 'You cannot change shared parameters')
try: try:
self._check_is_correct_group(cred_id) self._check_is_correct_group(cred_id)
except Exception as e: except Exception as e:
return roxywi_common.handler_exceptions_for_json_data(e, '') return roxywi_common.handler_exceptions_for_json_data(e, '')
try:
self._is_editing_shared_ssh(cred_id, g.user_params['group_id'])
except Exception as e:
return roxywi_common.handler_exceptions_for_json_data(e, '')
try: try:
ssh_mod.update_ssh_key(body, group_id, cred_id) ssh_mod.update_ssh_key(body, group_id, cred_id)
return BaseResponse().model_dump(mode='json'), 201 return BaseResponse().model_dump(mode='json'), 201
except Exception as e: except Exception as e:
return roxywi_common.handler_exceptions_for_json_data(e, 'Cannot update SSH key') return roxywi_common.handler_exceptions_for_json_data(e, 'Cannot update SSH key')
def delete(self, cred_id: int): @validate(query=GroupQuery)
def delete(self, cred_id: int, query: GroupQuery):
""" """
Delete a credential entry Delete a credential entry
--- ---
@ -198,15 +202,30 @@ class CredView(MethodView):
description: 'ID of the credential to retrieve' description: 'ID of the credential to retrieve'
required: true required: true
type: 'integer' type: 'integer'
- in: 'query'
name: 'group_id'
description: 'ID of the group to list users. For superAdmin only'
required: false
type: 'integer'
responses: responses:
204: 204:
description: Credential deletion successful description: Credential deletion successful
""" """
group_id = SupportClass.return_group_id(query)
try:
self._is_editing_shared_ssh(cred_id, group_id)
except Exception as e:
return roxywi_common.handler_exceptions_for_json_data(e, '')
try: try:
self._check_is_correct_group(cred_id) self._check_is_correct_group(cred_id)
except Exception as e: except Exception as e:
return roxywi_common.handler_exceptions_for_json_data(e, '') return roxywi_common.handler_exceptions_for_json_data(e, '')
try:
self._is_editing_shared_ssh(cred_id, g.user_params['group_id'])
except Exception as e:
return roxywi_common.handler_exceptions_for_json_data(e, '')
try: try:
ssh_mod.delete_ssh_key(cred_id) ssh_mod.delete_ssh_key(cred_id)
return BaseResponse().model_dump(mode='json'), 204 return BaseResponse().model_dump(mode='json'), 204
@ -247,7 +266,13 @@ class CredView(MethodView):
try: try:
self._check_is_correct_group(cred_id) self._check_is_correct_group(cred_id)
except Exception as e: except Exception as e:
return roxywi_common.handler_exceptions_for_json_data(e, ''), 404 return roxywi_common.handler_exceptions_for_json_data(e, '')
try:
self._is_editing_shared_ssh(cred_id, g.user_params['group_id'])
except Exception as e:
return roxywi_common.handler_exceptions_for_json_data(e, '')
try: try:
body.private_key = base64.b64decode(body.private_key).decode("ascii") body.private_key = base64.b64decode(body.private_key).decode("ascii")
except Exception: except Exception:
@ -272,6 +297,11 @@ class CredView(MethodView):
except RoxywiResourceNotFound: except RoxywiResourceNotFound:
raise RoxywiResourceNotFound raise RoxywiResourceNotFound
def _is_editing_shared_ssh(self, cred_id: int, group_id: int):
ssh = self._get_ssh(cred_id)
if ssh.shared and g.user_params['role'] != 1 and int(group_id) != int(ssh.group_id):
raise RoxywiPermissionError('You cannot change shared parameters')
class CredsView(MethodView): class CredsView(MethodView):
methods = ['GET'] methods = ['GET']