refactor: 调整组织统计数据缓存的更新策略为懒更新模式

pull/5636/head
xinwen 2021-02-20 14:44:27 +08:00 committed by 老广
parent bb9790a50f
commit 83cc339d4b
4 changed files with 31 additions and 11 deletions

View File

@ -72,7 +72,7 @@ class Cache(metaclass=CacheBase):
def get_data(self) -> dict: def get_data(self) -> dict:
data = cache.get(self.key) data = cache.get(self.key)
logger.debug(f'CACHE: get {self.key} = {data}') logger.debug(f'Get data from cache: key={self.key} data={data}')
if data is not None: if data is not None:
data = json.loads(data) data = json.loads(data)
self._data = data self._data = data
@ -81,7 +81,7 @@ class Cache(metaclass=CacheBase):
def set_data(self, data): def set_data(self, data):
self._data = data self._data = data
to_json = json.dumps(data) to_json = json.dumps(data)
logger.info(f'CACHE: set {self.key} = {to_json}, timeout={self.timeout}') logger.info(f'Set data to cache: key={self.key} data={to_json} timeout={self.timeout}')
cache.set(self.key, to_json, timeout=self.timeout) cache.set(self.key, to_json, timeout=self.timeout)
def compute_data(self, *fields): def compute_data(self, *fields):
@ -122,6 +122,16 @@ class Cache(metaclass=CacheBase):
self.set_data(data) self.set_data(data)
return data return data
def expire_fields_with_lock(self, *fields):
with DistributedLock(name=f'{self.key}.refresh'):
data = self.get_data()
if data is not None:
logger.info(f'Expire cached fields: key={self.key} fields={fields}')
for f in fields:
data.pop(f)
self.set_data(data)
return data
def refresh(self, *fields): def refresh(self, *fields):
if not fields: if not fields:
# 没有指定 field 要刷新所有的值 # 没有指定 field 要刷新所有的值
@ -146,10 +156,13 @@ class Cache(metaclass=CacheBase):
def reload(self): def reload(self):
self._data = None self._data = None
def delete(self): def expire(self, *fields):
self._data = None if not fields:
logger.info(f'CACHE: delete {self.key}') self._data = None
cache.delete(self.key) logger.info(f'Delete cached key: key={self.key}')
cache.delete(self.key)
else:
self.expire_fields_with_lock(*fields)
class CacheValueDesc: class CacheValueDesc:
@ -167,7 +180,8 @@ class CacheValueDesc:
return self return self
if self.field_name not in instance.data: if self.field_name not in instance.data:
instance.refresh(self.field_name) instance.refresh(self.field_name)
value = instance.data[self.field_name] # 防止边界情况没有值,报错
value = instance.data.get(self.field_name)
return value return value
def compute_value(self, instance: Cache): def compute_value(self, instance: Cache):
@ -183,5 +197,5 @@ class CacheValueDesc:
new_value = compute_func() new_value = compute_func()
new_value = self.field_type.field_type(new_value) new_value = self.field_type.field_type(new_value)
logger.info(f'CACHE: compute {instance.key}.{self.field_name} = {new_value}') logger.info(f'Compute cache field value: key={instance.key} field={self.field_name} value={new_value}')
return new_value return new_value

View File

@ -158,6 +158,7 @@ class DistributedLock(RedisLock):
def _release(self): def _release(self):
try: try:
self._release_redis_lock() self._release_redis_lock()
logger.debug(f'I[{self.id}] released lock[{self.name}]')
except NotAcquired as e: except NotAcquired as e:
logger.error(f'I[{self.id}] release lock[{self.name}] failed {e}') logger.error(f'I[{self.id}] release lock[{self.name}] failed {e}')
self._raise_exc(e) self._raise_exc(e)

View File

@ -32,3 +32,8 @@ class OrgRelatedCache(Cache):
logger.info(f'CACHE: Send refresh task {self}.{fields}') logger.info(f'CACHE: Send refresh task {self}.{fields}')
refresh_org_cache_task.delay(self, *fields) refresh_org_cache_task.delay(self, *fields)
on_commit(func) on_commit(func)
def expire(self, *fields):
def func():
super(OrgRelatedCache, self).expire(*fields)
on_commit(func)

View File

@ -118,7 +118,7 @@ def refresh_user_amount_on_user_create_or_delete(user_id):
orgs = Organization.objects.filter(m2m_org_members__user_id=user_id).distinct() orgs = Organization.objects.filter(m2m_org_members__user_id=user_id).distinct()
for org in orgs: for org in orgs:
org_cache = OrgResourceStatisticsCache(org) org_cache = OrgResourceStatisticsCache(org)
org_cache.refresh_async('users_amount') org_cache.expire('users_amount')
@receiver(post_save, sender=User) @receiver(post_save, sender=User)
@ -144,7 +144,7 @@ def on_org_user_changed_refresh_cache(sender, action, instance, reverse, pk_set,
for org in orgs: for org in orgs:
org_cache = OrgResourceStatisticsCache(org) org_cache = OrgResourceStatisticsCache(org)
org_cache.refresh_async('users_amount') org_cache.expire('users_amount')
class OrgResourceStatisticsRefreshUtil: class OrgResourceStatisticsRefreshUtil:
@ -166,7 +166,7 @@ class OrgResourceStatisticsRefreshUtil:
cache_field_name = cls.model_cache_field_mapper.get(type(instance)) cache_field_name = cls.model_cache_field_mapper.get(type(instance))
if cache_field_name: if cache_field_name:
org_cache = OrgResourceStatisticsCache(instance.org) org_cache = OrgResourceStatisticsCache(instance.org)
org_cache.refresh_async(cache_field_name) org_cache.expire(cache_field_name)
@receiver(post_save) @receiver(post_save)