mirror of https://github.com/jumpserver/jumpserver
feat: 优化缓存,将会话的缓存拿出来
parent
3ae976c183
commit
35941ddf7f
|
@ -126,7 +126,7 @@ class Cache(metaclass=CacheType):
|
|||
return data
|
||||
|
||||
def save_data_to_db(self, data):
|
||||
logger.info(f'Set data to cache: key={self.key} data={data}')
|
||||
logger.debug(f'Set data to cache: key={self.key} data={data}')
|
||||
self.redis.hset(self.key, mapping=data)
|
||||
self.load_data_from_db()
|
||||
|
||||
|
@ -143,10 +143,10 @@ class Cache(metaclass=CacheType):
|
|||
|
||||
def init_all_values(self):
|
||||
t_start = time.time()
|
||||
logger.info(f'Start init cache: key={self.key}')
|
||||
logger.debug(f'Start init cache: key={self.key}')
|
||||
data = self.compute_values(*self.field_names)
|
||||
self.save_data_to_db(data)
|
||||
logger.info(f'End init cache: cost={time.time()-t_start} key={self.key}')
|
||||
logger.debug(f'End init cache: cost={time.time()-t_start} key={self.key}')
|
||||
return data
|
||||
|
||||
def refresh(self, *fields):
|
||||
|
@ -173,11 +173,11 @@ class Cache(metaclass=CacheType):
|
|||
def expire(self, *fields):
|
||||
self._data = None
|
||||
if not fields:
|
||||
logger.info(f'Delete cached key: key={self.key}')
|
||||
logger.debug(f'Delete cached key: key={self.key}')
|
||||
self.redis.delete(self.key)
|
||||
else:
|
||||
self.redis.hdel(self.key, *fields)
|
||||
logger.info(f'Expire cached fields: key={self.key} fields={fields}')
|
||||
logger.debug(f'Expire cached fields: key={self.key} fields={fields}')
|
||||
|
||||
|
||||
class CacheValueDesc:
|
||||
|
@ -201,7 +201,7 @@ class CacheValueDesc:
|
|||
|
||||
def compute_value(self, instance: Cache):
|
||||
t_start = time.time()
|
||||
logger.info(f'Start compute cache field: field={self.field_name} key={instance.key}')
|
||||
logger.debug(f'Start compute cache field: field={self.field_name} key={instance.key}')
|
||||
if self.field_type.queryset is not None:
|
||||
new_value = self.field_type.queryset.count()
|
||||
else:
|
||||
|
@ -214,7 +214,7 @@ class CacheValueDesc:
|
|||
new_value = compute_func()
|
||||
|
||||
new_value = self.field_type.field_type(new_value)
|
||||
logger.info(f'End compute cache field: cost={time.time()-t_start} field={self.field_name} value={new_value} key={instance.key}')
|
||||
logger.debug(f'End compute cache field: cost={time.time()-t_start} field={self.field_name} value={new_value} key={instance.key}')
|
||||
return new_value
|
||||
|
||||
def to_internal_value(self, value):
|
||||
|
|
|
@ -38,7 +38,7 @@ class OrgRelatedCache(Cache):
|
|||
在事务提交之后再发送信号,防止因事务的隔离性导致未获得最新的数据
|
||||
"""
|
||||
def func():
|
||||
logger.info(f'CACHE: Send refresh task {self}.{fields}')
|
||||
logger.debug(f'CACHE: Send refresh task {self}.{fields}')
|
||||
refresh_org_cache_task.delay(self, *fields)
|
||||
on_commit(func)
|
||||
|
||||
|
@ -93,7 +93,7 @@ class OrgResourceStatisticsCache(OrgRelatedCache):
|
|||
return node.assets_amount
|
||||
|
||||
def compute_total_count_online_users(self):
|
||||
return len(set(Session.objects.filter(is_finished=False).values_list('user_id', flat=True)))
|
||||
return Session.objects.filter(is_finished=False).values_list('user_id').distinct().count()
|
||||
|
||||
def compute_total_count_online_sessions(self):
|
||||
return Session.objects.filter(is_finished=False).count()
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from django.db.models.signals import m2m_changed
|
||||
from django.db.models.signals import post_save, pre_delete, pre_save
|
||||
from django.db.models.signals import post_save, pre_delete, pre_save, post_delete
|
||||
from django.dispatch import receiver
|
||||
|
||||
from orgs.models import Organization, OrganizationMember
|
||||
|
@ -60,7 +60,6 @@ class OrgResourceStatisticsRefreshUtil:
|
|||
Node: ['nodes_amount'],
|
||||
Asset: ['assets_amount'],
|
||||
UserGroup: ['groups_amount'],
|
||||
Session: ['total_count_online_users', 'total_count_online_sessions']
|
||||
}
|
||||
|
||||
@classmethod
|
||||
|
@ -78,11 +77,34 @@ def on_post_save_refresh_org_resource_statistics_cache(sender, instance, created
|
|||
OrgResourceStatisticsRefreshUtil.refresh_if_need(instance)
|
||||
|
||||
|
||||
@receiver(pre_delete)
|
||||
def on_pre_delete_refresh_org_resource_statistics_cache(sender, instance, **kwargs):
|
||||
@receiver(post_delete)
|
||||
def on_post_delete_refresh_org_resource_statistics_cache(sender, instance, **kwargs):
|
||||
OrgResourceStatisticsRefreshUtil.refresh_if_need(instance)
|
||||
|
||||
|
||||
def _refresh_session_org_resource_statistics_cache(instance: Session):
|
||||
cache_field_name = ['total_count_online_users', 'total_count_online_sessions']
|
||||
|
||||
org_cache = OrgResourceStatisticsCache(instance.org)
|
||||
org_cache.expire(*cache_field_name)
|
||||
OrgResourceStatisticsCache(Organization.root()).expire(*cache_field_name)
|
||||
|
||||
|
||||
@receiver(pre_save, sender=Session)
|
||||
def on_session_changed_refresh_org_resource_statistics_cache(sender, instance, **kwargs):
|
||||
OrgResourceStatisticsRefreshUtil.refresh_if_need(instance)
|
||||
def on_session_pre_save(sender, instance: Session, **kwargs):
|
||||
old = Session.objects.filter(id=instance.id).values_list('is_finished', flat=True)
|
||||
if old:
|
||||
instance._signal_old_is_finished = old[0]
|
||||
else:
|
||||
instance._signal_old_is_finished = None
|
||||
|
||||
|
||||
@receiver(post_save, sender=Session)
|
||||
def on_session_changed_refresh_org_resource_statistics_cache(sender, instance, created, **kwargs):
|
||||
if created or instance.is_finished != instance._signal_old_is_finished:
|
||||
_refresh_session_org_resource_statistics_cache(instance)
|
||||
|
||||
|
||||
@receiver(post_delete, sender=Session)
|
||||
def on_session_deleted_refresh_org_resource_statistics_cache(sender, instance, **kwargs):
|
||||
_refresh_session_org_resource_statistics_cache(instance)
|
||||
|
|
Loading…
Reference in New Issue