jumpserver/apps/terminal/models/session.py

248 lines
9.1 KiB
Python

from __future__ import unicode_literals
import os
import uuid
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.utils import timezone
from django.conf import settings
from django.core.files.storage import default_storage
from django.core.cache import cache
from assets.models import Asset
from assets.const import Protocol
from applications.models import Application
from users.models import User
from orgs.mixins.models import OrgModelMixin
from django.db.models import TextChoices
from common.utils import get_object_or_none, lazyproperty
from ..backends import get_multi_command_storage
class Session(OrgModelMixin):
class LOGIN_FROM(TextChoices):
ST = 'ST', 'SSH Terminal'
RT = 'RT', 'RDP Terminal'
WT = 'WT', 'Web Terminal'
DT = 'DT', 'DB Terminal'
id = models.UUIDField(default=uuid.uuid4, primary_key=True)
user = models.CharField(max_length=128, verbose_name=_("User"), db_index=True)
user_id = models.CharField(blank=True, default='', max_length=36, db_index=True)
asset = models.CharField(max_length=128, verbose_name=_("Asset"), db_index=True)
asset_id = models.CharField(blank=True, default='', max_length=36, db_index=True)
system_user = models.CharField(max_length=128, verbose_name=_("System user"), db_index=True)
system_user_id = models.CharField(blank=True, default='', max_length=36, db_index=True)
login_from = models.CharField(max_length=2, choices=LOGIN_FROM.choices, default="ST", verbose_name=_("Login from"))
remote_addr = models.CharField(max_length=128, verbose_name=_("Remote addr"), blank=True, null=True)
is_success = models.BooleanField(default=True, db_index=True)
is_finished = models.BooleanField(default=False, db_index=True)
has_replay = models.BooleanField(default=False, verbose_name=_("Replay"))
has_command = models.BooleanField(default=False, verbose_name=_("Command"))
terminal = models.ForeignKey('terminal.Terminal', null=True, on_delete=models.DO_NOTHING, db_constraint=False)
protocol = models.CharField(choices=Protocol.choices, default='ssh', max_length=16, db_index=True)
date_start = models.DateTimeField(verbose_name=_("Date start"), db_index=True, default=timezone.now)
date_end = models.DateTimeField(verbose_name=_("Date end"), null=True)
upload_to = 'replay'
ACTIVE_CACHE_KEY_PREFIX = 'SESSION_ACTIVE_{}'
SUFFIX_MAP = {1: '.gz', 2: '.replay.gz', 3: '.cast.gz'}
DEFAULT_SUFFIXES = ['.replay.gz', '.cast.gz', '.gz']
# Todo: 将来干掉 local_path, 使用 default storage 实现
def get_all_possible_local_path(self):
"""
获取所有可能的本地存储录像文件路径
:return:
"""
return [self.get_local_storage_path_by_suffix(suffix)
for suffix in self.SUFFIX_MAP.values()]
def get_all_possible_relative_path(self):
"""
获取所有可能的外部存储录像文件路径
:return:
"""
return [self.get_relative_path_by_suffix(suffix)
for suffix in self.SUFFIX_MAP.values()]
def get_local_storage_path_by_suffix(self, suffix='.cast.gz'):
"""
local_path: replay/2021-12-08/session_id.cast.gz
通过后缀名获取本地存储的录像文件路径
:param suffix: .cast.gz | '.replay.gz' | '.gz'
:return:
"""
rel_path = self.get_relative_path_by_suffix(suffix)
if suffix == '.gz':
# 兼容 v1 的版本
return rel_path
return os.path.join(self.upload_to, rel_path)
def get_relative_path_by_suffix(self, suffix='.cast.gz'):
"""
relative_path: 2021-12-08/session_id.cast.gz
通过后缀名获取外部存储录像文件路径
:param suffix: .cast.gz | '.replay.gz' | '.gz'
:return:
"""
date = self.date_start.strftime('%Y-%m-%d')
return os.path.join(date, str(self.id) + suffix)
def get_local_path_by_relative_path(self, rel_path):
"""
2021-12-08/session_id.cast.gz
:param rel_path:
:return: replay/2021-12-08/session_id.cast.gz
"""
return '{}/{}'.format(self.upload_to, rel_path)
def get_relative_path_by_local_path(self, local_path):
return local_path.replace('{}/'.format(self.upload_to), '')
def find_ok_relative_path_in_storage(self, storage):
session_paths = self.get_all_possible_relative_path()
for rel_path in session_paths:
if storage.exists(rel_path):
return rel_path
@property
def asset_obj(self):
return Asset.objects.get(id=self.asset_id)
@property
def user_obj(self):
return User.objects.get(id=self.user_id)
def can_replay(self):
return self.has_replay
@property
def can_join(self):
if self.is_finished:
return False
if self.login_from == self.LOGIN_FROM.RT:
return False
if Protocol in [
Protocol.SSH, Protocol.VNC, Protocol.RDP,
Protocol.TELNET, Protocol.K8S
]:
return True
else:
return False
@property
def can_terminate(self):
if self.is_finished:
return False
else:
return True
@lazyproperty
def terminal_display(self):
display = self.terminal.name if self.terminal else ''
return display
def save_replay_to_storage_with_version(self, f, version=2):
suffix = self.SUFFIX_MAP.get(version, '.cast.gz')
local_path = self.get_local_storage_path_by_suffix(suffix)
try:
name = default_storage.save(local_path, f)
except OSError as e:
return None, e
if settings.SERVER_REPLAY_STORAGE:
from ..tasks import upload_session_replay_to_external_storage
upload_session_replay_to_external_storage.delay(str(self.id))
return name, None
@classmethod
def set_sessions_active(cls, session_ids):
data = {cls.ACTIVE_CACHE_KEY_PREFIX.format(i): i for i in session_ids}
cache.set_many(data, timeout=5 * 60)
@classmethod
def get_active_sessions(cls):
return cls.objects.filter(is_finished=False)
def is_active(self):
key = self.ACTIVE_CACHE_KEY_PREFIX.format(self.id)
return bool(cache.get(key))
@property
def command_amount(self):
command_store = get_multi_command_storage()
return command_store.count(session=str(self.id))
@property
def login_from_display(self):
return self.get_login_from_display()
def get_asset_or_application(self):
instance = get_object_or_none(Asset, pk=self.asset_id)
if not instance:
instance = get_object_or_none(Application, pk=self.asset_id)
return instance
def get_target_ip(self):
instance = self.get_asset_or_application()
target_ip = instance.get_target_ip() if instance else ''
return target_ip
@classmethod
def generate_fake(cls, count=100, is_finished=True):
import random
from orgs.models import Organization
from users.models import User
from assets.models import Asset, SystemUser
from orgs.utils import get_current_org
from common.utils.random import random_datetime, random_ip
org = get_current_org()
if not org or org.is_root():
Organization.default().change_to()
i = 0
users = User.objects.all()[:100]
assets = Asset.objects.all()[:100]
system_users = SystemUser.objects.all()[:100]
while i < count:
user_random = random.choices(users, k=10)
assets_random = random.choices(assets, k=10)
system_users = random.choices(system_users, k=10)
ziped = zip(user_random, assets_random, system_users)
sessions = []
now = timezone.now()
month_ago = now - timezone.timedelta(days=30)
for user, asset, system_user in ziped:
ip = random_ip()
date_start = random_datetime(month_ago, now)
date_end = random_datetime(date_start, date_start + timezone.timedelta(hours=2))
data = dict(
user=str(user), user_id=user.id,
asset=str(asset), asset_id=asset.id,
system_user=str(system_user), system_user_id=system_user.id,
remote_addr=ip,
date_start=date_start,
date_end=date_end,
is_finished=is_finished,
)
sessions.append(Session(**data))
cls.objects.bulk_create(sessions)
i += 10
class Meta:
db_table = "terminal_session"
ordering = ["-date_start"]
verbose_name = _('Session record')
permissions = [
('monitor_session', _('Can monitor session')),
('share_session', _('Can share session')),
('terminate_session', _('Can terminate session')),
('validate_sessionactionperm', _('Can validate session action perm')),
]
def __str__(self):
return "{0.id} of {0.user} to {0.asset}".format(self)