jumpserver/apps/terminal/models/component/terminal.py

163 lines
5.1 KiB
Python

import time
import uuid
from django.conf import settings
from django.db import models
from django.utils.translation import ugettext_lazy as _
from common.const.signals import SKIP_SIGNAL
from common.utils import get_logger, lazyproperty
from orgs.utils import tmp_to_root_org
from terminal.const import TerminalType as TypeChoices
from users.models import User
from ..session import Session
logger = get_logger(__file__)
class TerminalStatusMixin:
id: str
ALIVE_KEY = 'TERMINAL_ALIVE_{}'
status_set: models.Manager
@lazyproperty
def last_stat(self):
return self.status_set.order_by('date_created').last()
@lazyproperty
def load(self):
from ...utils import ComputeLoadUtil
return ComputeLoadUtil.compute_load(self.last_stat)
@property
def is_alive(self):
if not self.last_stat:
return False
return time.time() - self.last_stat.date_created.timestamp() < 150
class StorageMixin:
command_storage: str
replay_storage: str
def get_command_storage(self):
from .storage import CommandStorage
storage = CommandStorage.objects.filter(name=self.command_storage).first()
return storage
def get_command_storage_config(self):
s = self.get_command_storage()
if s:
config = s.valid_config
else:
config = settings.DEFAULT_TERMINAL_COMMAND_STORAGE
return config
def get_command_storage_setting(self):
config = self.get_command_storage_config()
return {"TERMINAL_COMMAND_STORAGE": config}
def get_replay_storage(self):
from .storage import ReplayStorage
storage = ReplayStorage.objects.filter(name=self.replay_storage).first()
return storage
def get_replay_storage_config(self):
s = self.get_replay_storage()
if s:
config = s.config
else:
config = settings.DEFAULT_TERMINAL_REPLAY_STORAGE
return config
def get_replay_storage_setting(self):
config = self.get_replay_storage_config()
return {"TERMINAL_REPLAY_STORAGE": config}
class Terminal(StorageMixin, TerminalStatusMixin, models.Model):
id = models.UUIDField(default=uuid.uuid4, primary_key=True)
name = models.CharField(max_length=128, verbose_name=_('Name'))
type = models.CharField(
choices=TypeChoices.choices, default=TypeChoices.koko,
max_length=64, verbose_name=_('type')
)
remote_addr = models.CharField(max_length=128, blank=True, verbose_name=_('Remote Address'))
command_storage = models.CharField(max_length=128, verbose_name=_("Command storage"), default='default')
replay_storage = models.CharField(max_length=128, verbose_name=_("Replay storage"), default='default')
user = models.OneToOneField(User, related_name='terminal', verbose_name=_('Application User'), null=True,
on_delete=models.CASCADE)
is_deleted = models.BooleanField(default=False)
date_created = models.DateTimeField(auto_now_add=True)
comment = models.TextField(blank=True, verbose_name=_('Comment'))
@property
def is_active(self):
if self.user and self.user.is_active:
return True
return False
@is_active.setter
def is_active(self, active):
if self.user:
self.user.is_active = active
self.user.save()
def get_online_sessions(self):
with tmp_to_root_org():
return Session.objects.filter(terminal=self, is_finished=False)
def get_online_session_count(self):
return self.get_online_sessions().count()
@staticmethod
def get_login_title_setting():
from settings.utils import get_login_title
return {'TERMINAL_HEADER_TITLE': get_login_title()}
@property
def config(self):
configs = {}
for k in dir(settings):
if not k.startswith('TERMINAL'):
continue
configs[k] = getattr(settings, k)
configs.update(self.get_command_storage_setting())
configs.update(self.get_replay_storage_setting())
configs.update(self.get_login_title_setting())
configs.update({
'SECURITY_MAX_IDLE_TIME': settings.SECURITY_MAX_IDLE_TIME,
'SECURITY_SESSION_SHARE': settings.SECURITY_SESSION_SHARE
})
return configs
@property
def service_account(self):
return self.user
def delete(self, using=None, keep_parents=False):
if self.user:
setattr(self.user, SKIP_SIGNAL, True)
self.user.delete()
self.user = None
self.is_deleted = True
self.save()
return
def __str__(self):
status = "Active"
if self.is_deleted:
status = "Deleted"
elif not self.is_active:
status = "Disable"
elif not self.is_alive:
status = 'Offline'
return '%s: %s' % (self.name, status)
class Meta:
db_table = "terminal"
verbose_name = _("Terminal")
permissions = (
('view_terminalconfig', _('Can view terminal config')),
)