mirror of https://github.com/jumpserver/jumpserver
				
				
				
			
		
			
				
	
	
		
			251 lines
		
	
	
		
			8.6 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			251 lines
		
	
	
		
			8.6 KiB
		
	
	
	
		
			Python
		
	
	
import json
 | 
						||
import os
 | 
						||
import shutil
 | 
						||
 | 
						||
from django.conf import settings
 | 
						||
from django.core.files.base import ContentFile
 | 
						||
from django.core.files.storage import default_storage
 | 
						||
from django.core.files.uploadedfile import InMemoryUploadedFile
 | 
						||
from django.db import models, connections
 | 
						||
from django.db.utils import ProgrammingError, OperationalError
 | 
						||
from django.utils.translation import gettext_lazy as _
 | 
						||
from rest_framework.utils.encoders import JSONEncoder
 | 
						||
 | 
						||
from common.db.models import JMSBaseModel
 | 
						||
from common.db.utils import Encryptor
 | 
						||
from common.utils import get_logger
 | 
						||
from .const import ChatAITypeChoices
 | 
						||
from .signals import setting_changed
 | 
						||
 | 
						||
logger = get_logger(__name__)
 | 
						||
 | 
						||
 | 
						||
class SettingQuerySet(models.QuerySet):
 | 
						||
    def __getattr__(self, item):
 | 
						||
        queryset = list(self)
 | 
						||
        instances = [i for i in queryset if i.name == item]
 | 
						||
        if len(instances) == 1:
 | 
						||
            return instances[0]
 | 
						||
        else:
 | 
						||
            return Setting()
 | 
						||
 | 
						||
 | 
						||
class SettingManager(models.Manager):
 | 
						||
    def get_queryset(self):
 | 
						||
        return SettingQuerySet(self.model, using=self._db)
 | 
						||
 | 
						||
 | 
						||
class Setting(models.Model):
 | 
						||
    name = models.CharField(max_length=128, unique=True, verbose_name=_("Name"))
 | 
						||
    value = models.TextField(verbose_name=_("Value"), null=True, blank=True)
 | 
						||
    category = models.CharField(max_length=128, default="default", verbose_name=_('Category'))
 | 
						||
    encrypted = models.BooleanField(default=False, verbose_name=_('Encrypted'))
 | 
						||
    enabled = models.BooleanField(verbose_name=_("Enabled"), default=True)
 | 
						||
    comment = models.TextField(verbose_name=_("Comment"))
 | 
						||
 | 
						||
    objects = SettingManager()
 | 
						||
    cache_key_prefix = '_SETTING_'
 | 
						||
 | 
						||
    def __str__(self):
 | 
						||
        return self.name
 | 
						||
 | 
						||
    def is_name(self, name):
 | 
						||
        return self.name == name
 | 
						||
 | 
						||
    @property
 | 
						||
    def cleaned_value(self):
 | 
						||
        try:
 | 
						||
            value = self.value
 | 
						||
            if self.encrypted:
 | 
						||
                value = Encryptor(value).decrypt()
 | 
						||
            if not value:
 | 
						||
                return None
 | 
						||
            value = json.loads(value)
 | 
						||
            return value
 | 
						||
        except json.JSONDecodeError:
 | 
						||
            return None
 | 
						||
 | 
						||
    @cleaned_value.setter
 | 
						||
    def cleaned_value(self, item):
 | 
						||
        try:
 | 
						||
            v = json.dumps(item, cls=JSONEncoder)
 | 
						||
            if self.encrypted:
 | 
						||
                v = Encryptor(v).encrypt()
 | 
						||
            self.value = v
 | 
						||
        except json.JSONDecodeError as e:
 | 
						||
            raise ValueError("Json dump error: {}".format(str(e)))
 | 
						||
 | 
						||
    @classmethod
 | 
						||
    def refresh_all_settings(cls):
 | 
						||
        try:
 | 
						||
            settings_list = cls.objects.all()
 | 
						||
            for setting in settings_list:
 | 
						||
                setting.refresh_setting()
 | 
						||
        except (ProgrammingError, OperationalError):
 | 
						||
            pass
 | 
						||
 | 
						||
    @classmethod
 | 
						||
    def refresh_item(cls, name):
 | 
						||
        item = cls.objects.filter(name=name).first()
 | 
						||
        if not item:
 | 
						||
            return
 | 
						||
        item.refresh_setting()
 | 
						||
        setting_changed.send(sender=cls, name=name, item=item)
 | 
						||
 | 
						||
    def refresh_setting(self):
 | 
						||
        setattr(settings, self.name, self.cleaned_value)
 | 
						||
        self.refresh_keycloak_to_openid_if_need()
 | 
						||
 | 
						||
    def refresh_keycloak_to_openid_if_need(self):
 | 
						||
        watch_config_names = [
 | 
						||
            'AUTH_OPENID', 'AUTH_OPENID_REALM_NAME', 'AUTH_OPENID_SERVER_URL',
 | 
						||
            'AUTH_OPENID_PROVIDER_ENDPOINT', 'AUTH_OPENID_KEYCLOAK'
 | 
						||
        ]
 | 
						||
        if self.name not in watch_config_names:
 | 
						||
            # 不在监听的配置中, 不需要刷新
 | 
						||
            return
 | 
						||
        auth_keycloak = self.__class__.objects.filter(name='AUTH_OPENID_KEYCLOAK').first()
 | 
						||
        if not auth_keycloak or not auth_keycloak.cleaned_value:
 | 
						||
            # 关闭 Keycloak 方式的配置, 不需要刷新
 | 
						||
            return
 | 
						||
 | 
						||
        from jumpserver.conf import Config
 | 
						||
        config_names = [
 | 
						||
            'AUTH_OPENID', 'AUTH_OPENID_REALM_NAME',
 | 
						||
            'AUTH_OPENID_SERVER_URL', 'AUTH_OPENID_PROVIDER_ENDPOINT'
 | 
						||
        ]
 | 
						||
        # 获取当前 keycloak 配置
 | 
						||
        keycloak_config = {}
 | 
						||
        for name in config_names:
 | 
						||
            setting = self.__class__.objects.filter(name=name).first()
 | 
						||
            if not setting:
 | 
						||
                continue
 | 
						||
            value = setting.cleaned_value
 | 
						||
            keycloak_config[name] = value
 | 
						||
 | 
						||
        # 转化 keycloak 配置为 openid 配置
 | 
						||
        openid_config = Config.convert_keycloak_to_openid(keycloak_config)
 | 
						||
        if not openid_config:
 | 
						||
            return
 | 
						||
        # 刷新 settings
 | 
						||
        for key, value in openid_config.items():
 | 
						||
            setattr(settings, key, value)
 | 
						||
            self.__class__.update_or_create(key, value, encrypted=False, category=self.category)
 | 
						||
 | 
						||
    @classmethod
 | 
						||
    def save_to_file(cls, value: InMemoryUploadedFile):
 | 
						||
        filename = value.name
 | 
						||
        filepath = f'settings/{filename}'
 | 
						||
        path = default_storage.save(filepath, ContentFile(value.read()))
 | 
						||
        url = default_storage.url(path)
 | 
						||
        return url
 | 
						||
 | 
						||
    @classmethod
 | 
						||
    def update_or_create(cls, name='', value='', encrypted=False, category=''):
 | 
						||
        """
 | 
						||
        不能使用 Model 提供的,update_or_create 因为这里有 encrypted 和 cleaned_value
 | 
						||
        :return: (changed, instance)
 | 
						||
        """
 | 
						||
        setting = cls.objects.filter(name=name).first()
 | 
						||
        changed = False
 | 
						||
        if not setting:
 | 
						||
            setting = Setting(name=name, encrypted=encrypted, category=category)
 | 
						||
 | 
						||
        if isinstance(value, InMemoryUploadedFile):
 | 
						||
            value = cls.save_to_file(value)
 | 
						||
 | 
						||
        if setting.cleaned_value != value:
 | 
						||
            setting.encrypted = encrypted
 | 
						||
            setting.cleaned_value = value
 | 
						||
            setting.save()
 | 
						||
            changed = True
 | 
						||
        return changed, setting
 | 
						||
 | 
						||
    class Meta:
 | 
						||
        db_table = "settings_setting"
 | 
						||
        verbose_name = _("System setting")
 | 
						||
        permissions = [
 | 
						||
            ('change_email', _('Can change email setting')),
 | 
						||
            ('change_auth', _('Can change auth setting')),
 | 
						||
            ('change_ops', _('Can change auth ops')),
 | 
						||
            ('change_ticket', _('Can change auth ticket')),
 | 
						||
            ('change_virtualapp', _('Can change virtual app setting')),
 | 
						||
            ('change_announcement', _('Can change auth announcement')),
 | 
						||
            ('change_vault', _('Can change vault setting')),
 | 
						||
            ('change_chatai', _('Can change chat ai setting')),
 | 
						||
            ('change_systemmsgsubscription', _('Can change system msg sub setting')),
 | 
						||
            ('change_sms', _('Can change sms setting')),
 | 
						||
            ('change_security', _('Can change security setting')),
 | 
						||
            ('change_clean', _('Can change clean setting')),
 | 
						||
            ('change_interface', _('Can change interface setting')),
 | 
						||
            ('change_license', _('Can change license setting')),
 | 
						||
            ('change_terminal', _('Can change terminal setting')),
 | 
						||
            ('change_other', _('Can change other setting')),
 | 
						||
        ]
 | 
						||
 | 
						||
 | 
						||
class ChatPrompt(JMSBaseModel):
 | 
						||
    name = models.CharField(max_length=128, verbose_name=_('Name'), unique=True)
 | 
						||
    content = models.TextField(blank=False, null=False, verbose_name=_('Content'))
 | 
						||
    builtin = models.BooleanField(default=False, verbose_name=_('Builtin'))
 | 
						||
 | 
						||
    class Meta:
 | 
						||
        verbose_name = _("Chat prompt")
 | 
						||
 | 
						||
    def __str__(self):
 | 
						||
        return self.name
 | 
						||
 | 
						||
 | 
						||
def get_chatai_data():
 | 
						||
    data = {
 | 
						||
        'url': settings.GPT_BASE_URL,
 | 
						||
        'api_key': settings.GPT_API_KEY,
 | 
						||
        'proxy': settings.GPT_PROXY,
 | 
						||
        'model': settings.GPT_MODEL,
 | 
						||
    }
 | 
						||
    if settings.CHAT_AI_TYPE != ChatAITypeChoices.gpt:
 | 
						||
        data['url'] = settings.DEEPSEEK_BASE_URL
 | 
						||
        data['api_key'] = settings.DEEPSEEK_API_KEY
 | 
						||
        data['proxy'] = settings.DEEPSEEK_PROXY
 | 
						||
        data['model'] = settings.DEEPSEEK_MODEL
 | 
						||
 | 
						||
    return data
 | 
						||
 | 
						||
 | 
						||
def init_sqlite_db():
 | 
						||
    db_path = settings.LEAK_PASSWORD_DB_PATH
 | 
						||
    if not os.path.isfile(db_path):
 | 
						||
        # 这里处理一下历史数据,有可能用户 copy 了旧的文件到 目录下
 | 
						||
        src = os.path.join(settings.PROJECT_DIR, 'data', 'leak_passwords.db')
 | 
						||
        if not os.path.isfile(src):
 | 
						||
            src = os.path.join(
 | 
						||
                settings.APPS_DIR, 'accounts', 'automations',
 | 
						||
                'check_account', 'leak_passwords.db'
 | 
						||
            )
 | 
						||
 | 
						||
        shutil.copy(src, db_path)
 | 
						||
    logger.info(f'init sqlite db {db_path}')
 | 
						||
    return db_path
 | 
						||
 | 
						||
 | 
						||
def register_sqlite_connection():
 | 
						||
    connections.databases['sqlite'] = {
 | 
						||
        'ENGINE': 'django.db.backends.sqlite3',
 | 
						||
        'ATOMIC_REQUESTS': False,
 | 
						||
        'NAME': settings.LEAK_PASSWORD_DB_PATH,
 | 
						||
        'TIME_ZONE': None,
 | 
						||
        'CONN_HEALTH_CHECKS': False,
 | 
						||
        'CONN_MAX_AGE': 0,
 | 
						||
        'OPTIONS': {},
 | 
						||
        'AUTOCOMMIT': True,
 | 
						||
    }
 | 
						||
 | 
						||
 | 
						||
class LeakPasswords(models.Model):
 | 
						||
    id = models.AutoField(primary_key=True)
 | 
						||
    password = models.CharField(max_length=1024, verbose_name=_("Password"))
 | 
						||
 | 
						||
    class Meta:
 | 
						||
        db_table = 'passwords'
 | 
						||
        managed = False
 |