fix: 特权操作时 同时加载密钥密码

pull/10653/head
feng 2023-06-07 15:02:11 +08:00 committed by Jiangjie.Bai
parent a10bb29a1e
commit 4a1f3ed727
2 changed files with 69 additions and 22 deletions

View File

@ -2,9 +2,8 @@
# -*- coding: utf-8 -*-
#
import uuid
import logging
from functools import reduce
import uuid
from collections import OrderedDict
from django.db import models
@ -14,7 +13,6 @@ from rest_framework.exceptions import ValidationError
from common.db.fields import JsonDictTextField
from common.utils import lazyproperty
from orgs.mixins.models import OrgModelMixin, OrgManager
from .base import AbsConnectivity
__all__ = ['Asset', 'ProtocolsMixin', 'Platform', 'AssetQuerySet']
@ -294,7 +292,7 @@ class Asset(AbsConnectivity, AbsHardwareInfo, ProtocolsMixin, NodesRelationMixin
auth_user = self.admin_user
become_user = None
auth_user.load_asset_special_auth(self)
auth_user.load_asset_special_auth(self, run_as_admin=True)
info = {
'username': auth_user.username,
'password': auth_user.password,

View File

@ -4,16 +4,16 @@
import logging
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.core.validators import MinValueValidator, MaxValueValidator
from django.core.cache import cache
from django.core.validators import MinValueValidator, MaxValueValidator
from django.db import models
from django.db.models import F, Q
from django.utils.translation import ugettext_lazy as _
from common.utils import signer, get_object_or_none, is_uuid
from .base import BaseUser
from .asset import Asset
from .authbook import AuthBook
from .base import BaseUser
__all__ = ['AdminUser', 'SystemUser']
logger = logging.getLogger(__name__)
@ -166,7 +166,44 @@ class AuthMixin:
if asset:
self.load_asset_more_auth(asset_id=asset.id, username=username, user_id=user_id)
def load_asset_special_auth(self, asset, username=''):
@staticmethod
def _load_run_as_admin_auth(authbook, not_stu_qs, stu_qs):
if not authbook.password:
password_query = not_stu_qs.values('password')
password = next((i['password'] for i in password_query if i['password']), None)
if not password:
password_query = stu_qs.values('password', stu_password=F('systemuser__password'))
password = next(
(
i['password'] or i['stu_password'] for i in password_query if
i['password'] or i['stu_password']
), ''
)
authbook.password = password
elif not authbook.private_key:
key_query = not_stu_qs.values('private_key', 'public_key')
key_data = next((i for i in key_query if i['private_key']), None)
if not key_data:
key_query = stu_qs.values(
'private_key', 'public_key',
stu_private_key=F('systemuser__private_key'),
stu_public_key=F('systemuser__public_key')
)
key_data = next(
(
{
'private_key': i['private_key'] or i['stu_private_key'],
'public_key': i['public_key'] or i['stu_public_key']
} for i in key_query if i['private_key'] or i['stu_private_key']
), {'private_key': '', 'public_key': ''}
)
authbook.private_key = key_data['private_key']
authbook.public_key = key_data['public_key']
def load_asset_special_auth(self, asset, username='', run_as_admin=False):
"""
AuthBook 的数据状态
| asset | systemuser | username |
@ -187,19 +224,22 @@ class AuthMixin:
if username == '':
username = self.username
authbook = AuthBook.objects.filter(
asset=asset, username=username, systemuser__isnull=True
).order_by('-date_created').first()
not_stu_query = Q(asset=asset, username=username, systemuser__isnull=True)
stu_query = Q(asset=asset, systemuser=self)
not_stu_qs = AuthBook.objects.filter(not_stu_query).order_by('-date_created')
stu_qs = AuthBook.objects.filter(stu_query).order_by('-date_created')
authbook = not_stu_qs.first()
if not authbook:
authbook = AuthBook.objects.filter(
asset=asset, systemuser=self
).order_by('-date_created').first()
authbook = stu_qs.first()
if not authbook:
return None
authbook.load_auth()
if run_as_admin:
self._load_run_as_admin_auth(authbook, not_stu_qs, stu_qs)
self.password = authbook.password
self.private_key = authbook.private_key
self.public_key = authbook.public_key
@ -249,12 +289,19 @@ class SystemUser(ProtocolMixin, AuthMixin, BaseUser):
users = models.ManyToManyField('users.User', blank=True, verbose_name=_("Users"))
groups = models.ManyToManyField('users.UserGroup', blank=True, verbose_name=_("User groups"))
type = models.CharField(max_length=16, choices=Type.choices, default=Type.common, verbose_name=_('Type'))
priority = models.IntegerField(default=81, verbose_name=_("Priority"), help_text=_("1-100, the lower the value will be match first"), validators=[MinValueValidator(1), MaxValueValidator(100)])
protocol = models.CharField(max_length=16, choices=ProtocolMixin.Protocol.choices, default='ssh', verbose_name=_('Protocol'))
priority = models.IntegerField(
default=81, verbose_name=_("Priority"),
help_text=_("1-100, the lower the value will be match first"),
validators=[MinValueValidator(1), MaxValueValidator(100)]
)
protocol = models.CharField(max_length=16, choices=ProtocolMixin.Protocol.choices, default='ssh',
verbose_name=_('Protocol'))
auto_push = models.BooleanField(default=True, verbose_name=_('Auto push'))
sudo = models.TextField(default='/bin/whoami', verbose_name=_('Sudo'))
shell = models.CharField(max_length=64, default='/bin/bash', verbose_name=_('Shell'))
login_mode = models.CharField(choices=LOGIN_MODE_CHOICES, default=LOGIN_AUTO, max_length=10, verbose_name=_('Login mode'))
shell = models.CharField(max_length=64, default='/bin/bash', verbose_name=_('Shell'))
login_mode = models.CharField(
choices=LOGIN_MODE_CHOICES, default=LOGIN_AUTO, max_length=10, verbose_name=_('Login mode')
)
sftp_root = models.CharField(default='tmp', max_length=128, verbose_name=_("SFTP Root"))
token = models.TextField(default='', verbose_name=_('Token'))
home = models.CharField(max_length=4096, default='', verbose_name=_('Home'), blank=True)
@ -262,7 +309,9 @@ class SystemUser(ProtocolMixin, AuthMixin, BaseUser):
ad_domain = models.CharField(default='', max_length=256)
# linux su 命令 (switch user)
su_enabled = models.BooleanField(default=False, verbose_name=_('User switch'))
su_from = models.ForeignKey('self', on_delete=models.SET_NULL, related_name='su_to', null=True, verbose_name=_("Switch from"))
su_from = models.ForeignKey(
'self', on_delete=models.SET_NULL, related_name='su_to', null=True, verbose_name=_("Switch from")
)
def __str__(self):
username = self.username