Python实现线程安全队列

   最近学习spark,我主要使用pyspark api进行编程。python

以前使用Python都是现学现用,用完就忘了也没有理解和记忆,所以这里把Python相关的知识也弥补和记录下来吧编程

多线程任务队列在实际项目中很是有用,关键的地方要实现队列的多线程同步问题,也即保证队列的多线程安全api

例如:能够开多个消费者线程,每一个线程上绑定一个队列,这样就实现了多个消费者同时处理不一样队列上的任务安全

同时能够有多个生产者往队列发送消息,实现异步消息处理多线程

先复习下互斥量条件变量的概念:异步

互斥量(mutex)从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放互斥量上的锁。对互斥量进行加锁之后,任何其余试图再次对互斥锁加锁的线程将会阻塞直到当前线程释放该互斥锁。若是释放互斥锁时有多个线程阻塞,全部在该互斥锁上的阻塞线程都会变成可运行状态,第一个变为运行状态的线程能够对互斥锁加锁,其余线程将会看到互斥锁依然被锁住,只能回去再次等待它从新变为可用。函数

条件变量(cond)是在多线程程序中用来实现”等待–》唤醒”逻辑经常使用的方法。条件变量利用线程间共享的全局变量进行同步的一种机制,主要包括两个动做:一个线程等待”条件变量的条件成立”而挂起;另外一个线程使“条件成立”。为了防止竞争,条件变量的使用老是和一个互斥锁结合在一块儿。线程在改变条件状态前必须首先锁住互斥量,函数pthread_cond_wait把本身放到等待条件的线程列表上,而后对互斥锁解锁(这两个操做是原子操做)。在函数返回时,互斥量再次被锁住学习

条件变量老是与互斥锁一块儿使用的测试

Python的threading中定义了两种锁:threading.Lock和threading.RLockspa

二者的不一样在于后者是可重入锁,也就是说在一个线程内重复LOCK同一个锁不会发生死锁,这与POSIX中的PTHREAD_MUTEX_RECURSIVE也就是可递归锁的概念是相同的, 互斥锁的API有三个函数,分别执行分配锁,上锁,解锁操做。

python的threading中的条件变量默认绑定了一个RLock,也能够在初始化条件变量的时候传进去一个本身定义的锁.

最后贴出我本身实现的简单线程安全任务队列

测试代码

相关文章
相关标签/搜索