Redis return stale data and update data Asynchronouslypython
python#!/usr/bin/env python # coding=utf-8 import os import sys import redis from binascii import crc32 import json import logging root_path = [os.path.dirname(os.path.dirname(os.path.abspath(__file__)))] sys.path += root_path class RedisCache(object): redis_pool = None def __init__(self, hosts=['localhost'], duration=600): self.duration = duration self.redis_pool = self.redis_pool or [redis.Redis(connection_pool=redis.ConnectionPool(host=h, port=p, db=d), socket_timeout=0.5) for h,p,d in hosts] def _get_redis(self, key=None): if not key: return self.redis_pool[0] idx = crc32(key) % len(self.redis_pool) return self.redis_pool[idx] def get_value(self, key): v = self._get_redis(key).get(key) if v: try: l = json.loads(v) except Exception, e: raise e else: return l return None def set_value(self, key, value, duration=60): self._get_redis(key).set(key, json.dumps(value)) self._get_redis(key).expire(key, int(duration or self.duration)) def delete(self, key):python self._get_redis(key).delete(key)
设置stale时段,在此期间的get请求即时返回数据以后,经过异步任务更新数据
这里用了 tornado.ioloop; 任意语言的的异步机制固然没有问题redis
pythonfrom tornado.ioloop import IOLoop class StaleRedisCache(RedisCache): #def __init__(self, hosts=['localhost'], duration=600, stale=100): def __init__(self, hosts=['localhost'], duration=600, stale=7200): self.stale = stale RedisCache.__init__(self, hosts, duration) def get_value(self, key, callback=None, *args, **kwargs): l = None r = self._get_redis(key) res = r.pipeline().ttl(key).get(key).execute() if res[1]: try: l = json.loads(res[1]) except Exception, e: raise e if not res[0] or res[0] < self.stale and callback: def func(): value = callback and callback(*args, **kwargs) logging.info("set_value for key %s" % key) #r.pipeline().set(key, json.dumps(value)).expire(key, kwargs.get('duration', self.duration)).execute() r.pipeline().set(key, json.dumps(value)).expire(key, int(kwargs.get('duration', self.duration)) + self.stale).execute() return value # redis-cli版本不一样,res[0] 可能为 None(<=2.6.*) or -2(>=2.6) if not res[0] or res[0] == -2: return func() IOLoop.current().add_timeout(IOLoop.current().time(), func) return l # 这里已经不须要单独的 set_value 接口,由于再 get_value 中已经实现 def set_value(self, key, value, duration=60): pass
pythonstale_cache = StaleRedisCache([('localhost', 6379, 0)]) def sc_callback: return sth.from.mongo or bla bla bla... def abc: value = stale_cache.get_value('key1', sc_callback)
有没有很简洁呢~json
在第一个图片示意图中,设置了stale=100
, 比 self.duration=600
还小;
这样其实并无发挥 stale异步更新数据的优点;异步
stale Redis 部分的代码作了2处改动socket
这样一来,最终的效果是:tornado