mirror of https://github.com/Aidaho12/haproxy-wi
335 lines
12 KiB
Python
335 lines
12 KiB
Python
import base64
|
|
|
|
from flask.views import MethodView
|
|
from flask_pydantic import validate
|
|
from flask import jsonify, g
|
|
from flask_jwt_extended import jwt_required
|
|
|
|
import app.modules.db.cred as cred_sql
|
|
import app.modules.roxywi.common as roxywi_common
|
|
import app.modules.server.ssh as ssh_mod
|
|
from app.middleware import get_user_params, page_for_admin, check_group
|
|
from app.modules.db.db_model import Cred
|
|
from app.modules.roxywi.exception import RoxywiGroupMismatch, RoxywiResourceNotFound, RoxywiPermissionError
|
|
from app.modules.roxywi.class_models import BaseResponse, GroupQuery, CredRequest, CredUploadRequest
|
|
from app.modules.common.common_classes import SupportClass
|
|
|
|
|
|
class CredView(MethodView):
|
|
methods = ['GET', 'POST', 'PUT', 'DELETE', 'PATCH']
|
|
decorators = [jwt_required(), get_user_params(), page_for_admin(level=2), check_group()]
|
|
|
|
def __init__(self, is_api=False):
|
|
self.is_api = is_api
|
|
|
|
@staticmethod
|
|
@validate(query=GroupQuery)
|
|
def get(cred_id: int, query: GroupQuery):
|
|
"""
|
|
Retrieve credential information for a specific ID
|
|
---
|
|
tags:
|
|
- 'SSH credentials'
|
|
parameters:
|
|
- in: 'path'
|
|
name: 'cred_id'
|
|
description: 'ID of the credential to retrieve'
|
|
required: true
|
|
type: 'integer'
|
|
responses:
|
|
200:
|
|
description: 'Individual Credential Information'
|
|
schema:
|
|
type: 'object'
|
|
properties:
|
|
group_id:
|
|
type: 'integer'
|
|
description: 'Group ID the credential belongs to'
|
|
id:
|
|
type: 'integer'
|
|
description: 'Credential ID'
|
|
key_enabled:
|
|
type: 'integer'
|
|
description: 'Key status of the credential'
|
|
name:
|
|
type: 'string'
|
|
description: 'Name of the credential'
|
|
username:
|
|
type: 'string'
|
|
description: 'Username associated with the credential'
|
|
password:
|
|
type: 'string'
|
|
description: 'Password associated with the credential'
|
|
passphrase:
|
|
type: 'string'
|
|
description: 'Password for the SSH private key'
|
|
private_key:
|
|
type: 'string'
|
|
description: 'SSH private key in base64 encoded format'
|
|
shared:
|
|
type: 'integer'
|
|
description: 'Is shared credential for other groups or not'
|
|
404:
|
|
description: 'Credential not found'
|
|
"""
|
|
group_id = SupportClass.return_group_id(query)
|
|
try:
|
|
creds = ssh_mod.get_creds(group_id=group_id, cred_id=cred_id, not_shared=True)
|
|
return jsonify(creds), 200
|
|
except Exception as e:
|
|
return roxywi_common.handler_exceptions_for_json_data(e, 'Cannot get credentials')
|
|
|
|
@validate(body=CredRequest)
|
|
def post(self, body: CredRequest):
|
|
"""
|
|
Create a new credential entry
|
|
---
|
|
tags:
|
|
- SSH credentials
|
|
parameters:
|
|
- in: body
|
|
name: body
|
|
schema:
|
|
id: AddCredentials
|
|
required:
|
|
- group_шв
|
|
- name
|
|
- username
|
|
- key_enabled
|
|
- password
|
|
properties:
|
|
group_id:
|
|
type: integer
|
|
description: The ID of the group to create the credential for. Only for superAdmin role
|
|
name:
|
|
type: string
|
|
description: The credential name
|
|
username:
|
|
type: string
|
|
description: The username
|
|
key_enabled:
|
|
type: integer
|
|
description: If key is enabled or not
|
|
password:
|
|
type: string
|
|
description: The password
|
|
shared:
|
|
type: 'integer'
|
|
description: 'Is shared credential for other groups or not'
|
|
responses:
|
|
201:
|
|
description: Credential addition successful
|
|
"""
|
|
group_id = SupportClass.return_group_id(body)
|
|
try:
|
|
return ssh_mod.create_ssh_cred(body.name, body.password, group_id, body.username, body.key_enabled, self.is_api, body.shared)
|
|
except Exception as e:
|
|
return roxywi_common.handler_exceptions_for_json_data(e, 'Cannot create new cred')
|
|
|
|
@validate(body=CredRequest)
|
|
def put(self, cred_id: int, body: CredRequest):
|
|
"""
|
|
Update a credential entry
|
|
---
|
|
tags:
|
|
- SSH credentials
|
|
parameters:
|
|
- in: 'path'
|
|
name: 'cred_id'
|
|
description: 'ID of the credential to retrieve'
|
|
required: true
|
|
type: 'integer'
|
|
- in: body
|
|
name: body
|
|
schema:
|
|
id: UpdateCredentials
|
|
required:
|
|
- name
|
|
- username
|
|
- key_enabled
|
|
- password
|
|
properties:
|
|
group_id:
|
|
type: integer
|
|
description: The ID of the group to create the credential for. Only for superAdmin role
|
|
name:
|
|
type: string
|
|
description: The credential name
|
|
username:
|
|
type: string
|
|
description: The username
|
|
key_enabled:
|
|
type: integer
|
|
description: If key is enabled or not
|
|
password:
|
|
type: string
|
|
description: The password
|
|
shared:
|
|
type: 'integer'
|
|
description: 'Is shared credential for other groups or not'
|
|
responses:
|
|
201:
|
|
description: Credential update successful
|
|
"""
|
|
group_id = SupportClass.return_group_id(body)
|
|
ssh = self._get_ssh(cred_id)
|
|
if ssh.shared and g.user_params['role'] != 1 and int(group_id) != int(ssh.group_id):
|
|
return roxywi_common.handler_exceptions_for_json_data(RoxywiPermissionError(), 'You cannot change shared parameters')
|
|
try:
|
|
self._check_is_correct_group(cred_id)
|
|
except Exception as e:
|
|
return roxywi_common.handler_exceptions_for_json_data(e, '')
|
|
|
|
try:
|
|
ssh_mod.update_ssh_key(body, group_id, cred_id)
|
|
return BaseResponse().model_dump(mode='json'), 201
|
|
except Exception as e:
|
|
return roxywi_common.handler_exceptions_for_json_data(e, 'Cannot update SSH key')
|
|
|
|
def delete(self, cred_id: int):
|
|
"""
|
|
Delete a credential entry
|
|
---
|
|
tags:
|
|
- SSH credentials
|
|
parameters:
|
|
- in: 'path'
|
|
name: 'cred_id'
|
|
description: 'ID of the credential to retrieve'
|
|
required: true
|
|
type: 'integer'
|
|
responses:
|
|
204:
|
|
description: Credential deletion successful
|
|
"""
|
|
try:
|
|
self._check_is_correct_group(cred_id)
|
|
except Exception as e:
|
|
return roxywi_common.handler_exceptions_for_json_data(e, '')
|
|
|
|
try:
|
|
ssh_mod.delete_ssh_key(cred_id)
|
|
return BaseResponse().model_dump(mode='json'), 204
|
|
except Exception as e:
|
|
return roxywi_common.handler_exceptions_for_json_data(e, 'Cannot delete SSH key')
|
|
|
|
@validate(body=CredUploadRequest)
|
|
def patch(self, cred_id: int, body: CredUploadRequest):
|
|
"""
|
|
Upload an SSH private key
|
|
---
|
|
tags:
|
|
- SSH credentials
|
|
parameters:
|
|
- in: 'path'
|
|
name: 'cred_id'
|
|
description: 'ID of the credential to retrieve'
|
|
required: true
|
|
type: 'integer'
|
|
- in: body
|
|
name: body
|
|
schema:
|
|
id: UploadSSHKey
|
|
required:
|
|
- private_key
|
|
- passphrase
|
|
properties:
|
|
private_key:
|
|
type: string
|
|
description: The private key string or base64 encoded string
|
|
passphrase:
|
|
type: string
|
|
description: The passphrase
|
|
responses:
|
|
201:
|
|
description: SSH key upload successful
|
|
"""
|
|
try:
|
|
self._check_is_correct_group(cred_id)
|
|
except Exception as e:
|
|
return roxywi_common.handler_exceptions_for_json_data(e, ''), 404
|
|
try:
|
|
body.private_key = base64.b64decode(body.private_key).decode("ascii")
|
|
except Exception:
|
|
pass
|
|
try:
|
|
ssh_mod.upload_ssh_key(cred_id, body.private_key, body.passphrase)
|
|
return BaseResponse().model_dump(mode='json'), 201
|
|
except Exception as e:
|
|
return roxywi_common.handler_exceptions_for_json_data(e, 'Cannot upload SSH key')
|
|
|
|
def _check_is_correct_group(self, cred_id: int):
|
|
if g.user_params['role'] == 1:
|
|
return True
|
|
ssh = self._get_ssh(cred_id)
|
|
if int(ssh.group_id) != int(g.user_params['group_id']):
|
|
raise RoxywiGroupMismatch
|
|
|
|
@staticmethod
|
|
def _get_ssh(cred_id: int) -> Cred:
|
|
try:
|
|
return cred_sql.get_ssh(cred_id)
|
|
except RoxywiResourceNotFound:
|
|
raise RoxywiResourceNotFound
|
|
|
|
|
|
class CredsView(MethodView):
|
|
methods = ['GET']
|
|
decorators = [jwt_required(), get_user_params(), page_for_admin(level=2), check_group()]
|
|
|
|
@validate(query=GroupQuery)
|
|
def get(self, query: GroupQuery):
|
|
"""
|
|
Retrieve credential information based on group_id
|
|
---
|
|
tags:
|
|
- 'SSH credentials'
|
|
parameters:
|
|
- in: 'query'
|
|
name: 'group_id'
|
|
description: 'GroupQuery to filter servers. Only for superAdmin role'
|
|
required: false
|
|
type: 'integer'
|
|
responses:
|
|
200:
|
|
description: 'Credentials Information'
|
|
schema:
|
|
type: 'array'
|
|
items:
|
|
type: 'object'
|
|
properties:
|
|
group_id:
|
|
type: 'integer'
|
|
description: 'Group ID the credential belongs to'
|
|
id:
|
|
type: 'integer'
|
|
description: 'Credential ID'
|
|
key_enabled:
|
|
type: 'integer'
|
|
description: 'Key status of the credential'
|
|
name:
|
|
type: 'string'
|
|
description: 'Name of the credential'
|
|
username:
|
|
type: 'string'
|
|
description: 'Username of the credential'
|
|
password:
|
|
type: 'string'
|
|
description: 'Password associated with the credential'
|
|
passphrase:
|
|
type: 'string'
|
|
description: 'Password for the SSH private key'
|
|
private_key:
|
|
type: 'string'
|
|
description: 'SSH private key in base64 encoded format'
|
|
shared:
|
|
type: 'integer'
|
|
description: 'Is shared credential for other groups or not'
|
|
"""
|
|
group_id = SupportClass.return_group_id(query)
|
|
try:
|
|
creds = ssh_mod.get_creds(group_id=group_id)
|
|
return jsonify(creds), 200
|
|
except Exception as e:
|
|
return roxywi_common.handler_exceptions_for_json_data(e, 'Cannot get credentials')
|