import functools import time import weakref import collections class LocalCache(): class Dict(dict): def __del__(self): pass def __init__(self, maxlen=10): self.weak = weakref.WeakValueDictionary() # {'key1':{}, 'key2':{}, 'key3': {}} 内嵌的字典value弱引用self.strong里面的字典元素 self.strong = collections.deque(maxlen=maxlen) # 队列中的元素是字典对象 @staticmethod def nowTime(): return int(time.time()) def get(self, key): ''' 弱引用里面取数据,强引用里面删除对象 ''' value = self.weak.get(key, None) if value is not None: expire = value[r'expire'] if self.nowTime() > expire: return else: return value else: return def set(self, key, value): # to-do 判断key是否已经存在了、判断key是不是可hash对象 self.weak[key] = strongRef = LocalCache.Dict(value) # 这行代码是重点 self.strong.append(strongRef) # 闭包的一种使用场景 def funcCache(expire=0): caches = LocalCache() # 闭包的自由变量 def _wrappend(func): @functools.wraps(func) def __wrappend(*args, **kwargs): key = str(func) + str(args) + str(kwargs) # key是被装饰函数的参数拼接 result = caches.get(key) if result is None: result = func(*args, **kwargs) caches.set(key, {'result': result, 'expire': expire + caches.nowTime()}) result = caches.get(key) return result return __wrappend return _wrappend @funcCache(expire=3) def test_cache(v): print('work 3s') return v print(test_cache(1)) print(test_cache(2)) print(test_cache(1)) # 参数为1 在上面已经有调用过了,因此这个是从缓存中取的 print(test_cache(2)) # 参数为2 在上面已经有调用过了,因此这个是从缓存中取的 # 测试缓存超时后(4s > 3s)的效果 print('\n缓存过时后的结果:') time.sleep(4) print(test_cache(1)) print(test_cache(2))
输出结果php
work 3s {'result': 1, 'expire': 1626521912} work 3s {'result': 2, 'expire': 1626521912} {'result': 1, 'expire': 1626521912} {'result': 2, 'expire': 1626521912} 缓存过时后的结果: work 3s {'result': 1, 'expire': 1626521916} work 3s {'result': 2, 'expire': 1626521916}
弱引用参考:
https://blog.51cto.com/sleepd/1073044
https://blog.csdn.net/z_feng12489/article/details/89433878
本地缓存参考:
https://blog.csdn.net/weixin_34240520/article/details/92022464
functiontools.wraps的做用:
https://www.cnblogs.com/liuwei0824/p/10405212.htmlhtml