通常来讲,在对数据进行“加锁”时,程序首先须要经过获取(acquire)
锁来获得对数据进行排他性访问的能力,而后才能对数据执行一系列操做,最后还要释放(release)
给其余程序。对于可以被多个线程访问的共享内存数据结构(shared-memory data structure)
来讲,这种“先获取锁,而后执行操做,最后释放锁”的动做很是常见。Redis使用WATCH命令来代替对数据进行加锁,由于WATCH只会在数据被其余客户端抢先修改了的状况下通知执行了这个命令的客户端,而不会阻止其余客户端对数据的修改,因此这个命令被称为乐观锁(optimistic locking)
。
分布式锁也有相似的“首先获取锁,而后执行操做,最后释放锁”动做,但这种锁既不是给同一个进程中的多个线程使用,也不是给同一台机器上的多个进程使用,而是由不一样机器上的不一样Redis客户端进行获取和释放的。redis
为了防止客户端在取得锁以后崩溃,并致使锁一直处于“已被获取”的状态,最终版的锁实现将带有超时限制特性:若是得到锁的进程未能在指定的时限内完成操做,那么锁将自动释放。数据结构
致使锁出现不正确行为的缘由,以及锁在不正确运行时的症状:
持有锁的进程由于操做时间过长而致使锁被自动释放,但进程自己并不知晓这一点,甚至还可能会错误地释放掉了其余进程持有的锁。
一个持有锁并打算执行长时间操做的进程已经崩溃,但其余想要获取锁的进程不知道哪一个进程持有着锁,也没法检测出持有锁的进程已经崩溃,只能白白地浪费时间等待锁被释放。
在一个进程持有的锁过时以后,其余多个进程同时尝试去获取锁,而且都得到了锁。
上面第一种状况和第三种状况同时出现,致使有多个进程得到了锁,而每一个进程都觉得本身是惟一一个得到锁的进程。分布式
简易锁
为了对数据进行排他性访问,程序首先要作的就是获取锁。SETNX命令天生就适合用来实现锁的获取
功能,这个命令只会在键不存在
的状况下为键赋值,而锁要作的就是将一个随机生成的128位UUID设置为键的值,并使用这个值来防止锁被其余进程取得。
若是程序尝试获取锁的时候失败,那么它将不断地进行重试,直到成功地取得锁或者超过给定的时限为止。ide
def acquire_lock(conn, lockname, acquire_timeout=10): identifier = str(uuid.uuid4()) //128位随机标识符 end = time.time() + acquire_timeout while time.time() < end: if conn.setnx('lock:' + lockname, identifier): //尝试获取锁 return identifier time.sleep(.001) return False
下面代码展现了使用锁从新实现的商品购买操做:程序首先对市场进行加锁,接着检查商品的价格,并在确保买家有足够的钱来购买商品以后,对钱和商品进行相应的转移。当操做执行完以后,程序就会释放锁。函数
def purchase_item_with_lock(conn, buyerid, itemid, sellerid): buyer = "users:%s"%buyerid sellerid = "users:%s"%sellerid item = "%s.%s"%(itemid, sellerid) inventory = "inventory:%s"%buyerid locked = acquire_lock(conn, market) if not locked: return False pipe = conn.pipeline(True) try://检查指定的商品是否仍在出售,以及买家是否有足够的钱来购买该商品 pipe.zscore("market:", item) pipe.hget(buyer, 'funds') price, funds = pipe.execute() if price is None or price > funds: return None pipe.hincrby(seller, 'funds', int(price)) pipe.hincrby(buyer, 'funds', int(-price)) pipe.sadd(inventory, itemid) pipe.zrem("market:", item) pipe.execute() return True finally: release_lock(conn, market, locked) //释放锁
上面代码的锁彷佛是用来加锁整个购买操做的,但实际上这把锁是用来锁住市场数据的,它之因此会包围着执行购买操做的代码,是由于程序在操做市场数据期间必须一直持有锁。测试
接下面的代码release_lock函数展现了锁释放操做的实现代码:函数首先使用WATCH命令监视表明锁的键,接着检查键目前的值是否和加锁时设置的值相同,而且确认值没有变化以后删除该键(这个检查还能够防止程序错误地释放同一个锁屡次)。ui
def release_lock(conn, lockname, identifier): pipe = conn.pipeline(True) lockname = 'lock:' + lockname while True: try: pipe.watch(lockname) if pipe.get(lockname) == identifier: pipe.multi() pipe.delete(lockname) pipe.execute() return True pipe.unwatch() break except redis.exceptions.WatchError: pass return False
通过测试,与以前WATCH实现相比,锁实现的上架商品数量虽然有所减小,可是在买入商品时却不须要进行重试,而且上架商品数量和买入商品数量之间的比率,也跟卖家数量和买家数量之间的比率接近。线程
带有超时限制的锁
目前的锁实如今持有者崩溃的时候不会自动释放,这将致使锁一直处于已被获取的状态。为了解决这个问题,咱们将为锁加上超时功能。code
为了给锁加上超时的限制特性,程序将在取得锁以后,调用EXPIRE命令来为锁设置过时时间,使得Redis能够自动删除超时的锁。为了确保锁在客户端已经崩溃(客户端在执行介于SETNX和EXPIRE之间的时候崩溃是最糟糕的)的状况下仍然可以自动被释放,客户端会尝试获取锁失败以后,检查锁的超时时间,并为未设置超时时间的锁设置超时时间。由于锁总会带有超时时间,并最终由于超时而自动被释放,使得其余客户端能够继续尝试获取已被释放的锁。进程
须要注意的一点是,由于多个客户端在同一时间内设置的超时时间基本上都是相同的,因此即便有多个客户端同时为同一个锁设置超时时间,锁的超时时间也不会产生太大变化。
def acquire_lock_with_timeout(conn, lockname, acquire_timeout=10, lock_timeout=10): identifier = str(uuid.uuid4()) lockname = 'lock:' + lockname lock_timeout = int(math.ceil(lock_timeout)) end = time.time() + acquire_timeout while time.time() < end: if conn.setnx(lockname, identifier): conn.expire(lockname, lock_timeout) return identifier elif not conn.ttl(lockname): conn.expire(lockname, lock_timeout) time.sleep(.001) return False