1,数据一致性python
当多个进程/线程对同一个共享资源读写,会由于资源的争夺而出现混乱,致使数据不一致。mysql
以下图:redis
在数据库的原始数据是 d0,上图的处理流程以下:算法
t1 时刻,有两个数据源的数据 d1,d2 分别到达数据处理层,主进程分配线程 Merge1 处理 d1,Merge2 处理 d2,二者又同时(假设仍是 t1 )从数据库获取原始数据 d0
t2 时刻,Merge1 合并完 d0 和 d1 的数据,并将合并后的数据存到数据库,数据库的数据变成 d0 + d1
t3 时刻,Merge2 合并完 d0 和 d2 的数据,并将合并后的数据存到数据库,数据库的数据变成 d0 + d2
t1 到 t3,数据库最终的数据变成了 d0 + d2,数据源 d1 的数据消失,出现数据不一致问题。
上面所列的问题,是因为多线程同时对某一个共享数据进行读写致使,咱们只要找到一种方案,使得对共享数据的访问是同步的,便可解决该问题。当有某个线程或者进程已经访问了该数据,其余进程或者线程就必须等待其访问结束,才可拥有该共享数据的访问权(进入临界区)。最简单的方式,就是加个同步锁。sql
锁的实现方式,按照应用的实现架构,可能会有如下几种类型:数据库
若是处理程序是单进程多线程的,在 python下,就可使用 threading 模块的 Lock 对象来限制对共享变量的同步访问,实现线程安全。缓存
单机多进程的状况,在 python 下,可使用 multiprocessing 的 Lock 对象来处理。安全
多机多进程部署的状况,就得依赖一个第三方组件(存储锁对象)来实现一个分布式的同步锁了。服务器
2,分布式锁实现方式网络
目前主流的分布式锁实现方式有如下几种:
基于数据库来实现,如 mysql
基于缓存来实现,如 redis
基于 zookeeper 来实现
下面咱们简单介绍下这几种锁的实现。
2.1,基于数据库的锁:
基于数据库的锁实现也有两种方式,一是基于数据库表,另外一种是基于数据库排他锁。
基于数据库表的增删:
基于数据库表增删是最简单的方式,首先建立一张锁的表主要包含下列字段:方法名,时间戳等字段。
具体使用的方法,当须要锁住某个方法时,往该表中插入一条相关的记录。这边须要注意,方法名是有惟一性约束的,若是有多个请求同时提交到数据库的话,数据库会保证只有一个操做能够成功,那么咱们就能够认为操做成功的那个线程得到了该方法的锁,能够执行方法体内容。执行完毕,须要delete该记录。
对于上述方案能够进行优化,如应用主从数据库,数据之间双向同步。一旦挂掉快速切换到备库上;作一个定时任务,每隔必定时间把数据库中的超时数据清理一遍;使用while循环,直到insert成功再返回成功,虽然并不推荐这样作;还能够记录当前得到锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,若是当前机器的主机信息和线程信息在数据库能够查到的话,直接把锁分配给他就能够了,实现可重入锁。
数据库的排他锁:
基于MySql的InnoDB引擎,可使用如下方法来实现加锁操做。
在查询语句后面增长for update,数据库会在查询过程当中给数据库表增长排他锁。当某条记录被加上排他锁以后,其余线程没法再在该行记录上增长排他锁。其余没有获取到锁的就会阻塞在上述select语句上,可能的结果有2种,在超时以前获取到了锁,在超时以前仍未获取到锁。
得到排它锁的线程便可得到分布式锁,当获取到锁以后,能够执行方法的业务逻辑,执行完方法以后,释放锁 connection.commit() 。
存在的问题主要是性能不高和sql超时的异常。
2.2,基于zookeeper实现
基于zookeeper临时有序节点能够实现的分布式锁。每一个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个惟一的瞬时有序节点。 判断是否获取锁的方式很简单,只须要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除便可。同时,其能够避免服务宕机致使的锁没法释放,而产生的死锁问题。
提供的第三方库有 curator ,具体使用读者能够自行去看一下。Curator提供的InterProcessMutex是分布式锁的实现。acquire方法获取锁,release方法释放锁。另外,锁释放、阻塞锁、可重入锁等问题均可以有有效解决。讲下阻塞锁的实现,客户端能够经过在ZK中建立顺序节点,而且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端能够检查本身建立的节点是否是当前全部节点中序号最小的,若是是就获取到锁,即可以执行业务逻辑。
最后,Zookeeper实现的分布式锁其实存在一个缺点,那就是性能上可能并无缓存服务那么高。由于每次在建立锁和释放锁的过程当中,都要动态建立、销毁瞬时节点来实现锁功能。ZK中建立和删除节点只能经过Leader服务器来执行,而后将数据同不到全部的Follower机器上。并发问题,可能存在网络抖动,客户端和ZK集群的session链接断了,zk集群觉得客户端挂了,就会删除临时节点,这时候其余客户端就能够获取到分布式锁了。
用法参考:https://yunjianfei.iteye.com/blog/2164888
2.3,基于缓存redis实现
相对于基于数据库实现分布式锁的方案来讲,基于缓存来实如今性能方面会表现的更好一点,存取速度快不少。并且不少缓存是能够集群部署的,能够解决单点问题。
使用redis的SETNX实现分布式锁,多个进程执行如下Redis命令:
SETNX lock.id <current Unix time + lock timeout + 1>
SETNX是将 key 的值设为 value,当且仅当 key 不存在。若给定的 key 已经存在,则 SETNX 不作任何动做。
返回1,说明该进程得到锁,SETNX将键 lock.id 的值设置为锁的超时时间,当前时间 +加上锁的有效时间。
返回0,说明其余进程已经得到了锁,进程不能进入临界区。进程能够在一个循环中不断地尝试 SETNX 操做,以得到锁。
3,分布式锁保持数据一致的原理
每种实现方式各有千秋,综合考量,咱们最终决定使用 redis,主要缘由是:
redis 是基于内存来操做,存取速度比数据库快,在高并发下,加锁以后的性能不会降低太多
redis 能够设置键值的生存时间(TTL)
redis 的使用方式简单,整体实现开销小
同时使用 redis 实现的分布锁还须要具有如下几个条件:
同一个时刻只能有一个线程占有锁,其余线程必须等待直到锁被释放
锁的操做必须知足原子性
不会发生死锁,例如已得到锁的线程在释放锁以前忽然异常退出,致使其余线程会一直在循环等待锁被释放
锁的添加和释放必须由同一个线程来设置
咱们在上图 的基础上,在 Data process 和 Database 之间加了一层锁,咱们在 redis 中使用添加了一个 lock_key 来做为锁的标识,流程图以下:
仍是假设某台机器(图中的machine)在数据库的原始数据是 d0,上图的处理流程变成了:
t1 时刻,有两个数据源的数据 d1,d2 同时到达数据处理层,主进程分配了线程 Merge1 处理 d1,线程 Merge2 处理 d2,二者又同时尝试从 redis 得到锁
t2 时刻,Merge1 成功得到了锁,同时从数据库中加载 machine 的原始数据 d0,Merge2 循环等待 Merge1 释放锁
t3 时刻,Merge1 合并完数据,并将合并好的数据 d0 + d1 存放到数据库,最后释放锁
t4 时刻,Merge2 得到了锁,同时从数据库中加载machine的数据 d0 + d1
t5 时刻,Merge2 合并完数据,并将合并好的数据 d0 + d1 + d2 存放到数据库,最后释放锁
从以上能够看到保持数据一致的原理其实也不难,无非就是使用一个键值来使得多个线程对同一台机器的数据的读写是同步的,可是在实现的过程当中,每每会忽视了分布式锁所要具有的某个条件,极端状况下,仍是会出现数据不一致的问题。
几个要用到的redis命令:
setnx(key, value):“set if not exits”,若该key-value不存在,则成功加入缓存而且返回1,不然返回0。
get(key):得到key对应的value值,若不存在则返回nil。
getset(key, value):先获取key对应的value值,若不存在则返回nil,而后将旧的value更新为新的value。
expire(key, seconds):设置key-value的有效期为seconds秒。
4,死锁的问题
SETNX实现分布式锁,可能会存在死锁的状况。与单机模式下的锁相比,分布式环境下不只须要保证进程可见,还须要考虑进程与锁之间的网络问题。某个线程获取了锁以后,断开了与Redis 的链接,锁没有及时释放,竞争该锁的其余线程都会hung,产生死锁的状况。
在使用 SETNX 得到锁时,咱们将键 lock.id 的值设置为锁的有效时间,线程得到锁后,其余线程还会不断的检测锁是否已超时,若是超时,等待的线程也将有机会得到锁。然而,锁超时,咱们不能简单地使用 DEL 命令删除键 lock.id 以释放锁。
考虑如下状况:
A已经首先得到了锁 lock.id,而后线A断线。B,C都在等待竞争该锁;
B,C读取lock.id的值,比较当前时间和键 lock.id 的值来判断是否超时,发现超时;
B执行 DEL lock.id命令,并执行 SETNX lock.id 命令,并返回1,B得到锁;
C因为各刚刚检测到锁已超时,执行 DEL lock.id命令,将B刚刚设置的键 lock.id 删除,执行 SETNX lock.id命令,并返回1,即C得到锁。
上面的步骤很明显出现了问题,致使B,C同时获取了锁。在检测到锁超时后,线程不能直接简单地执行 DEL 删除键的操做以得到锁。
对于上面的步骤进行改进,问题是出在删除键的操做上面,那么获取锁以后应该怎么改进呢?
首先看一下redis的GETSET这个操做, GETSET key value ,将给定 key 的值设为 value ,并返回 key 的旧值(old value)。利用这个操做指令,咱们改进一下上述的步骤。
A已经首先得到了锁 lock.id,而后线A断线。B,C都在等待竞争该锁;
B,C读取lock.id的值,比较当前时间和键 lock.id 的值来判断是否超时,发现超时;
B检测到锁已超时,即当前的时间大于键 lock.id 的值,B会执行
GETSET lock.id <current Unix timestamp + lock timeout + 1> 设置时间戳,经过比较键 lock.id 的旧值是否小于当前时间,判断进程是否已得到锁;
B发现GETSET返回的值小于当前时间,则执行 DEL lock.id命令,并执行 SETNX lock.id 命令,并返回1,B得到锁;
C执行GETSET获得的时间大于当前时间,则继续等待。
在线程释放锁,即执行 DEL lock.id 操做前,须要先判断锁是否已超时。若是锁已超时,那么锁可能已由其余线程得到,这时直接执行 DEL lock.id 操做会致使把其余线程已得到的锁释放掉。
使用Zookeeper实现分布式锁的优势:
有效的解决单点问题,不可重入问题,非阻塞问题以及锁没法释放的问题。实现起来较为简单。
使用Zookeeper实现分布式锁的缺点:
性能上不如使用缓存实现分布式锁。 须要对ZK的原理有所了解。
5,redis代码实现分布式锁
结合上面(分布式锁保持数据一致的原理)提到的使用redis分布式锁的三种条件,使用三种不一样获取redis锁的方式,探索分布式锁的使用方法。
下面使用同一份测试代码:
咱们启用了多线程去对 redis 中的 test_key 的值进行自增操做,理想状况,test_key 的值应该等于线程的数量,好比开了 10 个线程,test_key的值最终应该是 10。
import threading, time, redis from redis import StrictRedis def increase_data(redis_conn, lock, key): lock_value = lock.get_lock(key) #获取锁 value = redis_conn.get(key) #获取数据 time.sleep(0.1) if value: value = int(value) + 1 else: value = 0 redis_conn.set(key, value) thread_name = threading.current_thread().name print(thread_name, value) lock.del_lock(key) #释放锁 ##主程序 if __name__ == "__main__": pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=8) redis = StrictRedis(connection_pool=pool) lock = RedisLock(redis) key = 'test_key' thread_count = 10 redis.delete(key) for i in range(thread_count): thread = threading.Thread(target=increase_data, args=(redis, lock, key)) thread.start()
方式一:加锁操做非原子性
在这个版本中,当线程 A get(key) 的值为空时,set key 的值为 1,并返回,这表示线程 A 得到了锁,能够继续执行后面的操做,不然须要一直循环去获取锁,直到 key 的值再次为空,从新得到锁,执行任务完毕后释放锁。
class RedisLock(object): def __init__(self, redis_conn): self.redis_conn = redis_conn def get_lock_key(self, key): lock_key = 'lock_%s' %key return lock_key def get_lock(self, key): lock_key = self.get_lock_key(key) while True: value = self.redis_conn.get(lock_key) if not value: self.redis_conn.set(lock_key, 1) return True time.sleep(0.01) def del_lock(self, key): lock_key = self.get_lock_key(key) return self.redis_conn.delete(lock_key)
执行测试脚本,获得的结果以下:
Thread-1 1 Thread-5 2 Thread-2 2 Thread-6 3 Thread-7 3 Thread-4 3 Thread-9 4 Thread-8 5 Thread-10 5 Thread-3 5
观察结果就发现,同时有多个线程输出的结果是同样的。乍一看上面加锁的代码逻辑彷佛没啥问题,可是结果却事与愿违,缘由是上面的代码 get(key) 和 set(key, value) 并非原子性的,A 线程在 get(key) 的时候发现是空值,因而从新 set(key, value),但在 set 完成的前一刻,B 线程刚好 get(key) 的时候获得的仍是空值,而后也顺利得到锁,致使数据被两个或多个线程同时修改,最后出现不一致。
方式二:使用 setnx 来实现
鉴于上面版本是因为命令不是原子性操做形成两个或多个线程同时得到锁的问题,这个版本改为使用 redis 的 setnx 命令来进行锁的查询和设置操做,setnx 即 set if not exists,顾名思义就是当key不存在的时候才设置 value,并返回 1,若是 key 已经存在,则不进行任何操做,返回 0。
def get_lock(self, key): lock_key = self.get_lock_key(key) while True: value = self.redis_conn.setnx(lock_key, 1) if value: return True time.sleep(0.01)
代码执行结果:
('Thread-1', 0) ('Thread-9', 1) ('Thread-4', 2) ('Thread-8', 3) ('Thread-7', 4) ('Thread-10', 5) ('Thread-2', 6) ('Thread-6', 7) ('Thread-5', 8) ('Thread-3', 9)
结果是正确的,可是若是知足于此,仍是会出问题的,好比假设 A 线程得到了锁后,因为某种异常缘由致使线程 crash了,一直不释放锁呢?咱们稍微改一下测试用例的 increase 函数,模拟某个线程在释放锁以前由于异常退出。
def increase_data(redis_conn, lock, key): lock_value = lock.get_lock(key) #获取锁 value = redis_conn.get(key) #获取数据 time.sleep(0.1) if not value: value = int(value) + 1 else: value = 0 redis_conn.set(key, value) thread_name = threading.current_thread().name print(thread_name, value) if thread_name == "Thread-2": print("thread-2 crash ....") import sys sys.exit(1) lock.del_lock(key) #释放锁
代码执行结果:
Thread-2 3 Thread-2 crash.. .....
线程 2 crash 以后,后续的线程一直获取不了锁,便一直处于等待锁的状态,因而乎产生了死锁。若是数据是多线程处理的,好比每来一个数据就开一个线程去处理,那么堆积的线程会逐渐增多,最终可能会致使系统崩溃。
使用了 redis 来实现分布式锁,何不利用 redis 的 ttl 机制呢,给锁加上过时时间。
代码修改成:
def get_lock(self, key, timeout=1): lock_key = self.get_lock_key(key) while True: value = self.redis_conn.set(lock_key, 1, nx=True, ex=timeout) if value: break else: print("waiting....") time.sleep(0.1)
执行结果:
('Thread-1', 0) ('Thread-10', 1) ('Thread-8', 2) ('Thread-3', 3) ('Thread-2', 4) thread-2 crash .... ('Thread-7', 5) waiting.... waiting.... waiting.... ('Thread-9', 6) ('Thread-6', 7) ('Thread-4', 8) ('Thread-5', 9)
结果正确,线程 2 在 crash 后,其余线程在等待,直到锁过时。
进行到这里,彷佛已经能够解决数据不一致的问题了,但在欢喜之余,不妨多想一想会不会出现其余问题。好比假设 A 进程的逻辑还没处理完,可是锁因为过时时间到了,致使锁自动释放掉,这时 B 线程得到了锁,开始处理 B 的逻辑,而后 A 进程的逻辑处理完了,就把 B 进程的锁给删除了。
方式三:锁的生成和删除必须是同一个线程
先修改代码,设置代码的执行时间大于ttl时间
def increase_data(redis_conn, lock, key): lock_value = lock.get_lock(key) #获取锁 value = redis_conn.get(key) #获取数据 time.sleep(2.5) #模拟实际状况下进行的某些耗时操做, 且执行时间大于锁过时的时间 if value: value = int(value) + 1 else: value = 0 redis_conn.set(key, value) thread_name = threading.current_thread().name print(thread_name, value) if thread_name == "Thread-2": print("thread-2 crash ....") import sys sys.exit(1) lock.del_lock(key) #释放锁 执行结果: ('Thread-1', 0) Thread-5 is waiting.. ('Thread-4', 0) ('Thread-2', 1) thread-2 crash .... ('Thread-3', 1) ('Thread-5', 1)
从以上结果能够看出,因为每一个线程的执行时间大于锁的过时时间,当线程的任务还没执行完时,锁已经自动释放,使得下一个线程得到了锁,然后下一个线程的锁被上一个执行完了的线程删掉或者也是自动释放(具体要看线程的执行时间和锁的释放时间),因而又产生了同一个数据被两个或多个线程同时修改的问题,致使数据出现不一致。
咱们用四个线程,按照时间顺序画的流程图以下:
能够看到,在 2.5s 和 5s 的时刻,都产生了误删锁的状况。
既然这个现象是因为锁过时致使误删别人家的锁引起的,那咱们就顺着这个思路,强制线程只能删除本身设置的锁。若是是这样,就得被每一个线程的锁添加一个惟一标识了。看看上面的锁机制,咱们每次添加锁的时候,都是给 lock_key 设为 1,不管是 key 仍是 value,都不具有惟一性,若是把 key 设为每一个线程惟一的,那在分布式系统中,得产生 N (等于总线程数)个 key 了 ,从直观性和维护性上来讲,这都是不可取的,因而乎只能从 value 入手了。咱们看到每一个线程均可以取到一个惟一标识,即线程 ID,若是加上进程的 PID,以及机器的 IP,就能够构成一个线程锁的惟一标识了,若是还担忧不够惟一,再打上一个时间戳了,因而乎,咱们的分布式锁最终版就变成了如下这样:
class RedisLock(object): def __init__(self, redis_conn): self.redis_conn = redis_conn self.ip = socket.gethostbyname(socket.gethostname()) self.pid = os.getpid() def get_lock_key(self, key): lock_key = 'lock_%s' %key return lock_key def gen_unique_value(self): thread_name = threading.current_thread().name time_now = time.time() unique_value = "{0}-{1}-{2}-{3}".format(self.ip, self.pid, thread_name, time_now) return unique_value def get_lock(self, key, timeout=1): lock_key = self.get_lock_key(key) unique_value = self.gen_unique_value() print("unique value %s" % unique_value) while True: value = self.redis_conn.set(lock_key, 1, nx=True, ex=timeout) if value: return unique_value else: thread_name = threading.current_thread().name print("{} is waiting..".format(thread_name)) time.sleep(0.1) def del_lock(self, key, value): lock_key = self.get_lock_key(key) old_lock_value = self.redis_conn.get(lock_key) if old_lock_value == value: return self.redis_conn.delete(lock_key) 修改测试代码: def increase_data(redis_conn, lock, key): lock_value = lock.get_lock(key) #获取锁 value = redis_conn.get(key) #获取数据 time.sleep(2.5) #模拟实际状况下进行的某些耗时操做, 且执行时间大于锁过时的时间 if value: value = int(value) + 1 else: value = 0 redis_conn.set(key, value) thread_name = threading.current_thread().name print(thread_name, value) if thread_name == "Thread-2": print("thread-2 crash ....") import sys sys.exit(1) lock.del_lock(key, lock_value) #释放锁
运行结果:
unique value 192.168.1.110-45351-Thread-1-1555730398.38 unique value 192.168.1.110-45351-Thread-2-1555730398.39 unique value 192.168.1.110-45351-Thread-3-1555730398.4 unique value 192.168.1.110-45351-Thread-4-1555730398.42 unique value 192.168.1.110-45351-Thread-5-1555730398.43 ('Thread-1', 0) Thread-3 is waiting.. Thread-2 is waiting.. Thread-3 is waiting.. Thread-2 is waiting.. ('Thread-4', 0) Thread-3 is waiting.. ('Thread-5', 0) ('Thread-3', 1) ('Thread-2', 1) thread-2 crash ....
以上能够看出,问题没有获得解决。由于什么缘由呢?以上咱们设置值的惟一性只能确保线程不会误删其余线程产生的锁,进而出现连串的误删锁的状况,好比 A 删了 B 的锁,B 执行完删了 C 的锁 。使用 redis 的过时机制,只要业务的处理时间大于锁的过时时间,就没有一个很好的方式来避免因为锁过时致使其余线程同时占有锁的问题,因此须要熟悉业务的执行时间,来合理地设置锁的过时时间。
还需注意的一点是,以上的实现方式中,删除锁(del_lock)的操做不是原子性的,先是拿到锁,再判断锁的值是否相等,相等的话最后再删除锁,既然不是原子性的,就有可能存在这样一种极端状况:在判断的那一时刻,锁正好过时了,被其余线程占有了锁,那最后一步的删除,就可能会形成误删锁了。可使用官方推荐的 Lua 脚原本确保原子性:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
可是只要锁的过时时间设置的足够合理,这个问题实际上是能够忽略的,也能够说出现这种极端状况的几率是及其小的。
以上咱们使用 redis 来实现一个分布式的同步锁,来保证数据的一致性,其特色是:
知足互斥性,同一个时刻只能有一个线程能够获取锁
利用 redis 的 ttl 来确保不会出现死锁,但同时也会带来因为锁过时引起的多线程同时占有锁的问题,须要咱们合理设置锁的过时时间来避免
利用锁的惟一性来确保不会出现误删锁的状况
以上的方案中,咱们是假设 redis 服务端是单集群且高可用的,忽视了如下的问题:若是某一时刻 redis master 节点发生了故障,集群中的某个 slave 节点变成 master 节点,这时候就可能出现原 master 节点上的锁没有及时同步到 slave 节点,致使其余线程同时得到锁。对于这个问题,能够参考 redis 官方推出的 redlock 算法,可是比较遗憾的是,该算法也没有很好地解决锁过时的问题。
————————————————版权声明:本文为CSDN博主「达西布鲁斯」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处连接及本声明。原文连接:https://blog.csdn.net/biheyu828/article/details/89005866