python 缓存

缓存的目的

缓存是一种将定量数据加以保存以备迎合后续请求的处理方式,旨在加快数据的检索速度。html

简单实现本身的一个缓存类

import datetime
import pprint
import random

class MyCache(object):
    def __init__(self):
        self.cache = {}
        self.max_cache_size = 10

    def __contains__(self, key):
        """
        判断键是否存在于缓存中
        实现这个魔法方法 是为了在实例化以后检查 key 是否在缓存实例中
        :param key:
        :return:
        """
        return key in self.cache

    def update(self, key, value):
        """
        更新缓存字典 而且选择性删除最先的条目
        :param key:
        :param value:
        :return:
        """
        if key not in self.cache and len(self.cache) >= self.max_cache_size:
            self.remove_oldest()
        self.cache[key] = {"date_accessed": datetime.datetime.now(), "value": value}

    def remove_oldest(self):
        """
        删除最先访问时间的输入数据
        :return:
        """
        oldest_entry = None
        for key in self.cache:
            if not oldest_entry:
                oldest_entry = key
            elif self.cache[key]["date_accessed"] < self.cache[oldest_entry]['date_accessed']:
                oldest_entry = key
        self.cache.pop(oldest_entry)

    @property
    def size(self):
        """
        缓存容量
        :return:
        """
        return len(self.cache)
复制代码
  • contains, 虽然在这里并不必定要使用该方法,但其基本思路在于容许咱们检查该类实例,从而了解其中是否包含有咱们正在寻找的键。
  • 另外,update方法负责利用新的键/值对进行缓存字典更新。一旦达到或者超出缓存最大容量,其还会删除日期最先的输入数据。
  • 另外,remove_oldest方法负责具体的字典内早期数据删除工做。
  • 最后,咱们还引入了名为size的属性,其可以返回缓存的具体容量。

在运行这段代码以后,你们会注意到当缓存被占满时,其会删除时间最先的条目。 不过以上示例代码并无提到如何更新访问日期,即访问某一条数据的时候将时间设置为最新。python

进行测试:git

if __name__ == "__main__":
    keys = ["test", "red", "fox", "fence", "junk",
            "other", "alpha", "bravo", "cal", "devo",
            "ele"]

    s = "abcdefghijklmnop"
    cache = MyCache()
    for i, key in enumerate(keys):
        if key in cache:
            continue
        else:
            value = "".join(random.choice(s) for j in range(20))
            cache.update(key, value)
        print(f"{i+1}s iterations, {cache.size} cached entries")
        print()
    print(pprint.pformat(cache.cache))
    print("test" in cache)   # __contains__ 实现的效果 
    print("cal" in cache)
复制代码

使用 lru_cache 装饰器

import time
import urllib.error
import urllib.request
from functools import lru_cache

@lru_cache(maxsize=24)
def get_webpage(module):
    """
    获取特定Python模块网络页面
    """
    webpage = "https://docs.python.org/3/library/{}.html".format(module)
    try:
        with urllib.request.urlopen(webpage) as request:
            return request.read()
    except urllib.error.HTTPError:
        return None


if __name__ == '__main__':
    t1 = time.time()
    modules = ['functools', 'collections', 'os', 'sys']
    for module in modules:
        page = get_webpage(module)
        if page:
            print("{} module page found".format(module))
    t2 = time.time()
    for m in modules:
        page = get_webpage(m)
        if page:
            print(f"{m} get again ...")
    t3 = time.time()

    print(t2-t1)
    print(t3-t2)
    print((t2-t1) / (t3-t2))
复制代码

咱们利用lru_cache对get_webpage函数进行了装饰,并将其最大尺寸设置为24条调用。 在此以后,咱们设置了一条网页字符串变量,并将其传递至咱们但愿函数获取的模块当中。 如此一来,咱们就可以针对该函数运行屡次循环。能够看到在首次运行上述代码时,输出结果的显示速度相对比较慢。 但若是你们在同一会话中再次加以运行,那么其显示速度将极大加快——这意味着lru_cache已经正确对该调用进行了缓存处理。github

另外,咱们还能够将一条typed参数传递至该装饰器。 其属于一条Boolean,旨在通知该装饰器在typed为设定为True时对不一样类型参数进行分别缓存。web

使用 cachetools 模块

代码来源: www.thepythoncorner.com/2018/04/how…正则表达式

原文讲了如何使用 缓存来加速你的 python 程序,举出如下两个例子: 在未使用缓存时:redis

import time
import datetime


def get_candy_price(candy_id):
    # let's use a sleep to simulate the time your function spends trying to connect to
    # the web service, 5 seconds will be enough.
    time.sleep(5)

    # let's pretend that the price returned by the web service is $1 for candies with a
    # odd candy_id and $1,5 for candies with a even candy_id

    price = 1.5 if candy_id % 2 == 0 else 1

    return (datetime.datetime.now().strftime("%c"), price)


# now, let's simulate 20 customers in your show.
# They are asking for candy with id 2 and candy with id 3...
for i in range(0, 20):
    print(get_candy_price(2))
    print(get_candy_price(3))
复制代码

在适应了缓存以后:数据库

import time
import datetime

from cachetools import cached, TTLCache  # 1 - let's import the "cached" decorator and the "TTLCache" object from cachetools
cache = TTLCache(maxsize=100, ttl=300)  # 2 - let's create the cache object.


@cached(cache)  # 3 - it's time to decorate the method to use our cache system!
def get_candy_price(candy_id):
    # let's use a sleep to simulate the time your function spends trying to connect to
    # the web service, 5 seconds will be enough.
    time.sleep(5)

    # let's pretend that the price returned by the web service is $1 for candies with a
    # odd candy_id and $1,5 for candies with a even candy_id

    price = 1.5 if candy_id % 2 == 0 else 1

    return (datetime.datetime.now().strftime("%c"), price)


# now, let's simulate 20 customers in your show.
# They are asking for candy with id 2 and candy with id 3...
for i in range(0, 20):
    print(get_candy_price(2))
    print(get_candy_price(3))
复制代码

这里再也不展现运行结果,能够自行 copy 运行。后端

多级缓存

以上缓存的思路大同小异,可是并不能解决个人问题。我想按照多个条件去设置和缓存。相似于将缓存当作一个简易的数据库去查询,而不单单是简单的键值对的形式。 找到了一个 cacheout 模块,尝试去实现本身想要的功能。缓存

cacheout 使用

连接

github.com/dgilland/ca… cacheout.readthedocs.io/en/latest/m…

简介

这是一个 python 缓存库。

特色

  • In-memory caching using dictionary backend
  • Cache manager for easily accessing multiple cache objects
  • Reconfigurable cache settings for runtime setup when using module-level cache objects
  • Maximum cache size enforcement
  • Default cache TTL (time-to-live) as well as custom TTLs per cache entry
  • Bulk set, get, and delete operations
  • Bulk get and delete operations filtered by string, regex, or function
  • Memoization decorators
  • Thread safe
  • Multiple cache implementations:
    • FIFO (First In, First Out)
    • LIFO (Last In, First Out)
    • LRU (Least Recently Used)
    • MRU (Most Recently Used)
    • LFU (Least Frequently Used)
    • RR (Random Replacement)

简单翻译下:

  • 使用字典后端的内存缓存
  • 缓存管理器,用于轻松访问多个缓存对象
  • 使用模块级缓存对象时,运行时设置的可从新配置缓存设置
  • 最大缓存大小实施
  • 默认缓存TTL(生存时间)以及每一个缓存条目的自定义TTL
  • 批量设置,获取和删除操做
  • 批量获取和删除由字符串,正则表达式或函数过滤的操做
  • 记忆装饰
  • 线程安全
  • 多个缓存实现:
    • FIFO(先进先出)
    • LIFO(后进先出)
    • LRU(最近最少使用)
    • MRU(最近使用)
    • LFU(最不经常使用)
    • RR(随机替换)

路线图

Roadmap

  • Layered caching (multi-level caching)
  • Cache event listener support (e.g. on-get, on-set, on-delete)
  • Cache statistics (e.g. cache hits/misses, cache frequency, etc)

路线图

  • 分层缓存(多级缓存)
  • 缓存事件监听器支持(例如on-get,on-set,on-delete)
  • 缓存统计信息(例如缓存命中/未命中,缓存频率等)

安装

pip install cacheout
复制代码

依赖

Python >= 3.4
复制代码

简单使用

建立一个缓存对象:

# start with some basic caching by creating a cache object:
from cacheout import Cache
cache = Cache()
复制代码

默认有 256 的缓存个数以及不设置过时时间: cache = Cache() 等价于:

# By default the cache object will have a maximum size of 256 and default TTL expiration turned off. These values can be set with:
cache = Cache(maxsize=256, ttl=0, timer=time.time, default=None)  # defaults
复制代码

设置值:

# Set a cache key using cache.set():
cache.set(1, 'foobar')
复制代码

获取值:

# Get the value of a cache key with cache.get():
assert cache.get(1) == 'foobar'
复制代码

设置一个在没有获取到值的时候拿到的默认值:

# Get a default value when cache key isn't set:
assertcache.get(2) is None
assert cache.get(2, default=False) is False
assert 2 not in cache
复制代码

可是这个值并无被设置进入缓存。

设置一个全局的默认值:

# Provide a global default:
cache2 = Cache(default=True)
assert cache2.get('missing') is True
assert 'missing' not in cache2

cache3 = Cache(default=lambda key: key)
assert cache3.get('missing') == 'missing'
# missing 被设置进入缓存
assert 'missing' in cache3
复制代码

设置缓存的过时时间:

# Set the TTL (time-to-live) expiration per entry:
cache.set(3, {'data': {}}, ttl=1)
assert cache.get(3) == {'data': {}}
time.sleep(1)
assert cache.get(3) is None
复制代码

缓存函数的结果:

# Memoize a function where cache keys are generated from the called function parameters:
@cache.memoize()
def func(a, b):
    return a + b 

# Provide a TTL for the memoized function and incorporate argument types into generated cache keys:
@cache.memoize(ttl=5, typed=True)
def func(a, b):
    print("--- into --- func ---")
    return a + b

# func(1, 2) has different cache key than func(1.0, 2.0), whereas,
# with "typed=False" (the default), they would have the same key

print(func(1, 2))
print(func(1, 2))
print(func.uncached(1, 2))  # 访问原始的memoized功能
print(func(1, 2))
复制代码

获取一份缓存的拷贝

# Get a copy of the entire cache with cache.copy():
assert cache.copy() == {1: 'foobar', 2: ('foo', 'bar', 'baz')}
复制代码

删除缓存中的某个值

# Delete a cache key with cache.delete():
cache.delete(1)
assert cache.get(1) is None
复制代码

清空整个缓存

# Clear the entire cache with cache.clear():
cache.clear()
assert len(cache) == 0
复制代码

缓存的批量设置 获取 以及删除

# Perform bulk operations with cache.set_many(), cache.get_many(), and cache.delete_many():
cache.set_many({'a': 1, 'b': 2, 'c': 3})
assert cache.get_many(['a', 'b', 'c']) == {'a': 1, 'b': 2, 'c': 3}
cache.delete_many(['a', 'b', 'c'])
assert cache.count() == 0
复制代码

批量获取和删除时的匹配问题

# Use complex filtering in cache.get_many() and cache.delete_many():

import re
cache.set_many({'a_1': 1, 'a_2': 2, '123': 3, 'b': 4})

cache.get_many('a_*') == {'a_1': 1, 'a_2': 2}
cache.get_many(re.compile(r'\d')) == {'123': 3}
cache.get_many(lambda key: '2' in key) == {'a_2': 2, '123': 3}

cache.delete_many('a_*')
assert dict(cache.items()) == {'123': 3, 'b': 4}
复制代码

在建立以后从新配置缓存对象

# Reconfigure the cache object after creation with cache.configure():
cache.configure(maxsize=1000, ttl=5 * 60)
复制代码

像字典同样去获取缓存的键 值 键值对

# Get keys, values, and items from the cache with cache.keys() cache.values(), and cache.items():

cache.set_many({'a': 1, 'b': 2, 'c': 3})
assert list(cache.keys()) == ['a', 'b', 'c']
assert list(cache.values()) == [1, 2, 3]
assert list(cache.items()) == [('a', 1), ('b', 2), ('c', 3)]
复制代码

遍历迭代缓存

# Iterate over cache keys:

for key in cache:
    print(key, cache.get(key))
    # 'a' 1
    # 'b' 2
    # 'c' 3
复制代码

检查被缓存的键是否存在

# Check if key exists with cache.has() and key in cache:
assert cache.has('a')
assert 'a' in cache
复制代码

使用CacheManager管理多级缓存

from cacheout import CacheManager

cacheman = CacheManager({'a': {'maxsize': 100},
                         'b': {'maxsize': 200, 'ttl': 900},
                         'c': {})

cacheman['a'].set('key1', 'value1')
value = cacheman['a'].get('key')

cacheman['b'].set('key2', 'value2')
assert cacheman['b'].maxsize == 200
assert cacheman['b'].ttl == 900

cacheman['c'].set('key3', 'value3')

cacheman.clear_all()
for name, cache in cacheman:
    assert name in cacheman
    assert len(cache) == 0
复制代码

其中,最后讲到的多级缓存应该能够解决本身的问题,如图,若是个人接口存在股票类型和时间两个自变量,就能够将股票类型设置在一级缓存里面,将时间设置为二级缓存:

代码大体能够这么写: 大体是: [图片]

以前的作法是想(1)将缓存放在类变量里面;(2)使用 redis 缓存。

相关文章
相关标签/搜索