标准库queue模块,提供FIFO的queue、LIFO的队列,优先队列
Queue 类是线程安全的,适用于多线程间安全的交换数据,内部使用了Lock和Condition python
为何说容器的大小不许确,其缘由是若是不加锁,是不可能获取到准确的大小的,由于你刚读取了一个大小,还没取走,有可能被就被其余线程修改了,queue类的size虽然加了锁,可是依然不能保证当即get,put就能成功,由于读取大小和get,put方法是分来的。nginx
全局解释器锁,进程级别的锁GIL
Cpython在解释器进程中有一把锁,叫作GIL全局解释器锁。编程GIL 保证Cpython进程中,当前时刻只有一个线程执行代码,甚至在多核状况下,也是如此。安全
Cpython中
IO 密集型,因为线程阻塞,就会调度其余线程
CPU密集型,当前线程可能连续获取GIL,致使其余线程几乎没法使用CPU,若要唤醒其余线程,则须要准备数据,其代价是高昂的。服务器
IO 密集型,多线程解决,CPU密集型,多进程解决,绕开GIL。markdown
python中绝大多数内置数据结构的读写操做都是原子操做网络
因为GIL 的存在,python的内置数据类型在多线程编程的时候就变得安全了,可是实际上他们自己不是线程安全类型的数据结构
Guido坚持的简单哲学,对于初学者门槛低,不须要高深的系统知识也能安全,简单的使用python。
而移除GIL。会下降Cpython单线程的执行效率。多线程
相关实例并发
import logging import datetime logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s ") start=datetime.datetime.now() def calc(): sum=0 for _ in range(1000000000): sum+=1 calc() calc() calc() calc() calc() delta=(datetime.datetime.now()-start).total_seconds() logging.info(delta)
多线程模式下的计算结果
import logging import datetime import threading logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s ") start=datetime.datetime.now() def calc(): sum=0 for _ in range(1000000000): sum+=1 lst=[] for _ in range(5): t=threading.Thread(target=calc) t.start() lst.append(t) for t in lst: t.join() delta=(datetime.datetime.now()-start).total_seconds() print (delta)
结果以下
从这两个程序来看,Cpython中多线程根本没有优点,和一个线程执行的时间至关,由于存在GIL
因为python中的GIL ,多线程不是CPU密集型程序的最好选择
多进程能够在彻底独立的进程中运行程序,能够充分利用多处理器
可是进程自己的隔离带来数据不共享也是一个问题,且线程比进程轻量的多
多进程也是解决并发的一种手段
相同点:
进程是能够终止的,线程是不能经过命令终止的,线程的终止要么抛出异常,要么程序自己执行完成。
进程间同步提供了和线程同步同样的类,使用方式也是同样的,使用效果也是相似,不过,进程间同步的代价要高于线程,并且底层实现不一样。
multiprocessing 还提供了共享内存,服务器进程来共享数据,还提供了queue队列,匹配管道用于进程间通讯
不一样点
通讯方式不一样
1 多进程就是启用多个解释器进程,进程间通讯必须序列化,反序列化
2 数据的安全性问题多进程最好是在main中执行
多线程已经将数据进行处理了,其不须要再次进行序列化了多进程传递必须序列化和反序列化。
远程调用,RPC,跨网络
multiprocessing中的process类
process 类遵循了Thread类的API,减小了学习难度
不一样进程能够彻底调度到不一样的CPU上执行IO 密集型最好使用多线程
CPU 密集型最好使用多进程进程提供的相关属性
名称 | 含义 |
---|---|
pid | 进程ID |
exitcode | 进程退出的状态码 |
terminate() | 终止指定进程 |
import logging import datetime import multiprocessing logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s ") start=datetime.datetime.now() def calc(i): sum=0 for _ in range(1000000000): sum+=1 lst=[] for i in range(5): p=multiprocessing.Process(target=calc,args=(i,),name="P-{}".format(i)) p.start() lst.append(p) for p in lst: p.join() delta=(datetime.datetime.now()-start).total_seconds() print (delta)
结果以下
多进程自己避开了进程和进程之间调度须要的时间,多核心都使用了,此处存在CPU的调度问题
多进程对CPU的提高是显而易见的。
单线程,多线程都跑了很长时间,而多进程只是用了1分半,是真正的并行
import logging import datetime import multiprocessing logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s ") start=datetime.datetime.now() def calc(i): sum=0 for _ in range(1000000000): sum+=1 print (i,sum) if __name__=='__main__': start=datetime.datetime.now() p=multiprocessing.Pool(5) # 此处用于初始化进程池,其池中的资源是能够复用的 for i in range(5): p.apply_async(calc,args=(i,)) p.close() # 下面要执行join,上面必须先close p.join() delta=(datetime.datetime.now()-start).total_seconds() print (delta)
结果以下
进程建立的多,使用进程池进行处理仍是一种比较好的处理方式
1 CPU 密集型
Cpython 中使用了GIL,多线程的时候互相竞争,且多核优点不能发挥,python使用多进程效率更高2 IO密集型
适合使用多线程,减小IO序列化开销,且在IO等待时,切换到其余线程继续执行,效率不错,固然多进程也适用于IO密集型
请求/应答模型: WEB应用中常见的处理模型
master启动多个worker工做进程,通常和CPU数目相同
worker工做进程中启动多个线程,提升并发处理能力,worker处理用户的请求,每每须要等待数据
这就是nginx的工做模式工做进程通常都和CPU核数相同,CPU的亲原性,进程在CPU的迁移成本比较高。
concurrent.futures
3.2 版本引入的模块
异步并行任务编程模块,提供一个高级的异步可执行的便利接口提供了2个池执行器
ThreadPoolExecutor 异步调用的线程池的Executor
ProcessPoolExecutor 异步调用进程池的Executor
方法 | 含义 |
---|---|
ThreadPoolExecutor(max_workers=1) | 池中至多建立max_workers个线程的池来同时异步执行,返回Executor实例 |
submit(fn,*args,**kwagrs) | 提交执行的函数及参数,返回Future实例 |
shutdown(wait=True) | 清理池 |
Future 类
方法 | 含义 |
---|---|
result() | 能够查看调用的返回结果 |
done() | 若是调用被成功的取消或者执行完成,则返回为True |
cancelled() | 若是调用被成功取消,返回True |
running() | 若是正在运行且不能被取消,则返回True |
cancel() | 尝试取消调用,若是已经执行且不能取消则返回False,不然返回True |
result(timeout=None) | 取返回的结果,超时时为None,一直等待返回,超时设置到期,抛出concurrent.futures.TimeoutError异常 |
execption(timeout=None) | 取返回的异常,超时为None,一直等待返回,超时设置到期,抛出concurrent.futures.TimeoutError异常 |
import logging import threading from concurrent import futures import logging import time logging.basicConfig(level=logging.INFO,format="%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s") def worker(n): # 定义将来执行的任务 logging.info("begin to work{}".format(n)) time.sleep(5) logging.info("finished{}".format(n)) # 建立一个线程池,池容量为3 executor=futures.ThreadPoolExecutor(max_workers=3) fs=[] for i in range(3): f=executor.submit(worker,i) # 传入参数,返回Future对象 fs.append(f) for i in range(3,6): f=executor.submit(worker,i) # 传入参数,返回Future对象 fs.append(f) while True: time.sleep(2) logging.info(threading.enumerate()) #返回存活线程列表 flag=True for f in fs: logging.info(f.done()) # 若是被成功调用或取消完成,此处返回为True flag=flag and f.done() # 若都调用成功,则返回为True,不然则返回为False if flag: executor.shutdown() # 若是所有调用成功,则须要清理池 logging.info(threading.enumerate()) break
结果以下
其线程池中的线程是持续使用的,一旦建立好的线程,其不会变化,惟一很差的就是线程名未发生变化,但其最多影响了打印效果
import logging import threading from concurrent import futures import logging import time logging.basicConfig(level=logging.INFO,format="%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s") def worker(n): # 定义将来执行的任务 logging.info("begin to work{}".format(n)) time.sleep(5) logging.info("finished{}".format(n)) # 建立一个进程池,池容量为3 executor=futures.ProcessPoolExecutor(max_workers=3) fs=[] for i in range(3): f=executor.submit(worker,i) # 传入参数,返回Future对象 fs.append(f) for i in range(3,6): f=executor.submit(worker,i) # 传入参数,返回Future对象 fs.append(f) while True: time.sleep(2) flag=True for f in fs: logging.info(f.done()) # 若是被成功调用或取消完成,此处返回为True flag=flag and f.done() # 若都调用成功,则返回为True,不然则返回为False if flag: executor.shutdown() # 若是所有调用成功,则须要清理池 break
结果以下
concurrent.futures.ProcessPoolExecutor 继承自concurrent.futures.base.Executor,而父类有enter,_exit方法,其是支持上下文管理的,可使用with语句
import logging import threading from concurrent import futures import logging import time logging.basicConfig(level=logging.INFO,format="%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s") def worker(n): # 定义将来执行的任务 logging.info("begin to work{}".format(n)) time.sleep(5) logging.info("finished{}".format(n)) fs=[] with futures.ProcessPoolExecutor(max_workers=3) as executor: for i in range(6): futures=executor.submit(worker,i) fs.append(futures) while True: time.sleep(2) flag=True for f in fs: logging.info(f.done()) # 若是被成功调用或取消完成,此处返回为True flag=flag and f.done() # 若都调用成功,则返回为True,不然则返回为False if flag: executor.shutdown() # 若是所有调用成功,则须要清理池 break
结果以下
统一了线程池,进程池的调用,简化了编程,是python简单的思想哲学的提现惟一缺点: 没法设置线程名称