diff --git a/apps/common/cache.py b/apps/common/cache.py index b988a8673..67aba9191 100644 --- a/apps/common/cache.py +++ b/apps/common/cache.py @@ -1,5 +1,7 @@ import time +from channels_redis.core import RedisChannelLayer as _RedisChannelLayer + from common.utils.lock import DistributedLock from common.utils.connection import get_redis_client from common.utils import lazyproperty @@ -216,3 +218,29 @@ class CacheValueDesc: def to_internal_value(self, value): return self.field_type.field_type(value) + + +class RedisChannelLayer(_RedisChannelLayer): + async def _brpop_with_clean(self, index, channel, timeout): + cleanup_script = """ + local backed_up = redis.call('ZRANGE', ARGV[2], 0, -1, 'WITHSCORES') + for i = #backed_up, 1, -2 do + redis.call('ZADD', ARGV[1], backed_up[i], backed_up[i - 1]) + end + redis.call('DEL', ARGV[2]) + """ + backup_queue = self._backup_channel_name(channel) + async with self.connection(index) as connection: + # 部分云厂商的 Redis 此操作会报错(不支持,比如阿里云有限制) + try: + await connection.eval(cleanup_script, keys=[], args=[channel, backup_queue]) + except: + pass + result = await connection.bzpopmin(channel, timeout=timeout) + + if result is not None: + _, member, timestamp = result + await connection.zadd(backup_queue, float(timestamp), member) + else: + member = None + return member diff --git a/apps/jumpserver/settings/libs.py b/apps/jumpserver/settings/libs.py index 11aa0ba16..03c10bf1f 100644 --- a/apps/jumpserver/settings/libs.py +++ b/apps/jumpserver/settings/libs.py @@ -96,7 +96,7 @@ else: CHANNEL_LAYERS = { 'default': { - 'BACKEND': 'channels_redis.core.RedisChannelLayer', + 'BACKEND': 'common.cache.RedisChannelLayer', 'CONFIG': { "hosts": [{ 'address': (CONFIG.REDIS_HOST, CONFIG.REDIS_PORT),