假若有两个程序A和B,程序A在执行到一半的过程当中,须要读取大量的数据输入(I/O操做),而此时CPU只能静静地等待任务A读取完数据才能继续执行,这样就白白浪费了CPU资源。 是否是在程序A读取数据的过程当中,让程序B去执行,当程序A读取完数据以后,让 程序B暂停,而后让程序A继续执行? 固然没问题,但这里有一个关键词:切换。html
既然是切换,那么这就涉及到了状态的保存,状态的恢复,加上程序A与程序B所须要的系统资 源(内存,硬盘,键盘等等)是不同的。天然而然的就须要有一个东西去记录程序A和程序B 分别须要什么资源,怎样去识别程序A和程序B等等,因此就有了一个叫进程的抽象 python
进程定义: 进程就是一个程序在一个数据集上的一次动态执行过程。 数据库
进程通常由 程序、数据集、进程控制块 三部分组成。 api
线程的出现是为了下降上下文切换的消耗,提升系统的并发性,并突破一个进程只能干同样事的缺陷,使到进程内并发成为可能。
假设,一个文本程序,须要接受键盘输入,将内容显示在屏幕上,还须要保存信息到硬盘中。若只有一个进程,势必形成同一时间只能干同样事的尴尬(当保存时,就不能经过键盘输入内容)。
如有多个进程,每一个进程负责一个任务,进程A负责接收键盘输入的任务,进程B负责将内容显示在屏幕上的任务,进程C负责保存内容到硬盘中的任务。这里进程A,B,C间的协做涉及到了进程通讯问题,并且有共同都须要拥有的东西-------文本内容,不停的切换形成性能上的损失。
如有一种机制,可使任务A,B,C共享资源,这样上下文切换所须要保存和恢复的内容就少了,同时又能够减小通讯所带来的性能损耗,那就行了。是的,这种机制就是线程。缓存
线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程当中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。线程的引入减少了程序并发执行时的开销,提升了操做系统的并发性能。线程没有本身的系统资源。
- Threads share the address space of the process that created it; processes have their own address space.
- Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
- Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
- New threads are easily created; new processes require duplication of the parent process.
- Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
- Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
- 一个程序至少有一个进程,一个进程至少有一个线程.(进程能够理解成线程的容器)
- 进程在执行过程当中拥有独立的内存单元,而多个线程共享内存,从而极大地提升了程序的运行效率。
- 线程在执行过程当中与进程仍是有区别的。每一个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。可是线程不可以独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
- 进程是具备必定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位.
- 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.
- 线程本身基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)可是它可与同属一个进程的其余的线程共享进程所拥有的所有资源.
- 一个线程能够建立和撤销另外一个线程;同一个进程中的多个线程之间能够并发执行.
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)安全
上面的核心意思就是,不管你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只容许一个线程运行。数据结构
首先须要明确的一点是GIL并非Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就比如C++是一套语言(语法)标准,可是能够用不一样的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也同样,一样一段代码能够经过CPython,PyPy,Psyco等不一样的Python执行环境来执行。像其中的JPython就没有GIL。
然而由于CPython是大部分环境下默认的Python执行环境。因此在不少人的概念里CPython就是Python,也就想固然的把GIL归结为Python语言的缺陷。因此这里要先明确一点:GIL并非Python的特性,Python彻底能够不依赖于GIL。多线程
这篇文章透彻的剖析了GIL对python多线程的影响,强烈推荐看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf
threading 模块创建在thread 模块之上。thread模块以低级、原始的方式来处理和控制线程,而threading 模块经过对thread进行二次封装,提供了更方便的api来处理线程。并发
直接调用:app
import threading import time def sayhi(num): #定义每一个线程要运行的函数 print("running on number:%s" %num) time.sleep(3) if __name__ == '__main__': t1 = threading.Thread(target=sayhi,args=(1,),name="name_t1") #生成一个线程实例 t2 = threading.Thread(target=sayhi,args=(2,),name="name_t2") #生成另外一个线程实例 t1.start() #启动线程 t2.start() #启动另外一个线程 print(t1.getName()) #获取线程名 等同于 print(t1.name) print(t2.getName()) print(threading.current_thread().name) >>> running on number:1 running on number:2 name_t1 name_t2 MainThread
因为任何进程默认就会启动一个线程,咱们把该线程称为主线程,主线程又能够启动新的线程,Python的 threading 模块有个 current_thread()
函数,它永远返回当前线程的实例。主线程实例的名字叫 MainThread ,子线程的名字在建立时指定,咱们用LoopThread
命名子线程。名字仅仅在打印时用来显示,彻底没有其余意义,若是不起名字Python就自动给线程命名为Thread-1
,Thread-2
……
args里面是参数
继承式调用:
import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) # 等同于 super().__init__() self.num = num def run(self):#定义每一个线程要运行的函数 print("running on number:%s" %self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start() print("ending......")
Join & Daemon
#_*_coding:utf-8_*_ __author__ = 'Alex Li' import time import threading def run(n): print('[%s]------running----\n' % n) time.sleep(2) print('--done--') def main(): for i in range(5): t = threading.Thread(target=run,args=[i,]) t.start() t.join(1) print('starting thread', t.getName()) m = threading.Thread(target=main,args=[]) # m.setDaemon(True) #将main线程设置为Daemon线程,它作为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,无论是否执行完任务 m.start() m.join(timeout=2) print("---main thread done----") >>> [0]------running---- starting thread Thread-2 [1]------running---- ---main thread done---- --done-- starting thread Thread-3 [2]------running---- --done-- starting thread Thread-4 [3]------running---- --done-- starting thread Thread-5 [4]------running---- --done-- starting thread Thread-6 --done-- ***Repl Closed*** # 加了守护程序以后 >>> [0]------running---- starting thread Thread-2 [1]------running---- ---main thread done---- --done-- ***Repl Closed***
join():
在子线程完成运行以前,这个子线程的父线程将一直被阻塞。
setDaemon(True):
将线程声明为守护线程,必须在start() 方法调用以前设置, 若是不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。
当咱们 在程序运行中,执行一个主线程,若是主线程又建立一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成
想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。可是有时候咱们须要的是 只要主线程
完成了,无论子线程是否完成,都要和主线程一块儿退出,这时就能够 用setDaemon方法啦。
# run(): 线程被cpu调度后自动执行线程对象的run方法 # start():启动线程活动。 # isAlive(): 返回线程是否活动的。 # getName(): 返回线程名。 # setName(): 设置线程名。 threading模块提供的一些方法: # threading.currentThread(): 返回当前的线程变量。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
一个进程下能够启动多个线程,多个线程共享父进程的内存空间,也就意味着每一个线程能够访问同一份数据,此时,若是2个线程同时要修改同一份数据,会出现什么情况?
import time import threading def addNum(): global num #在每一个线程中都获取这个全局变量 #num-=1 temp=num #print('--get num:',num ) time.sleep(0.1) num =temp-1 #对此公共变量进行-1操做 num = 100 #设定一个共享变量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待全部线程执行完毕 t.join() print('final num:', num )
观察:time.sleep(0.1) /0.001/0.0000001 结果分别是多少?
假设你有A,B两个线程,此时都 要对num 进行减1操做, 因为2个线程是并发同时运行的,因此2个线程颇有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每一个线程在要修改公共数据时,为了不本身在还没改完的时候别人也来修改此数据,能够给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。
多个线程都在同时操做同一个共享资源,因此形成了资源破坏,怎么办呢?(join会形成串行,失去所线程的意义)
咱们能够经过 同步锁 来解决这种问题。
锁的概念是这样的,当一个线程lock.acquire()后,能够切换到另外一个线程,可是当另外一个线程执行到lock.acquire()后,发现已经锁住了,因此要等到
另外一个进程解锁之后,才能进行加锁。因此lock = threaing.Lock() 是个全局锁。这样lock能够在多个线程中判断
import time import threading def addNum(): global num #在每一个线程中都获取这个全局变量 print('--get num:',num ) time.sleep(1) lock.acquire() #修改数据前加锁 num -=1 #对此公共变量进行-1操做 lock.release() #修改后释放 num = 100 #设定一个共享变量 thread_list = [] lock = threading.Lock() #生成全局锁 for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待全部线程执行完毕 t.join() print('final num:', num )
Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为何这里还须要lock? 注意啦,这里的lock是用户级的lock,跟那个GIL不要紧 ,具体经过下图来看一下。
那你又问了, 既然用户程序已经本身有锁了,那为何C python还须要GIL呢?加入GIL主要的缘由是为了下降程序的开发的复杂度,好比如今的你写python不须要关心内存回收的问题,由于Python解释器帮你自动按期进行内存回收,你能够理解为python解释器里有一个独立的线程,每过一段时间它起wake up作一次全局轮询看看哪些内存数据是能够被清空的,此时你本身的程序 里的线程和 py解释器本身的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程当中的clearing时刻,可能一个其它线程正好又从新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决相似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题, 这能够说是Python早期版本的遗留问题。
在线程间共享多个资源的时候,若是两个线程分别占有一部分资源而且同时等待对方的资源,就会形成死锁,由于系统判断这部分资源都正在使用,全部这两个线程在无外力做用下将一直等待下去。下面是一个死锁的例子:
import threading,time class myThread(threading.Thread): def doA(self): lockA.acquire() print(self.name,"gotlockA",time.ctime()) time.sleep(3) lockB.acquire() print(self.name,"gotlockB",time.ctime()) lockB.release() lockA.release() def doB(self): lockB.acquire() print(self.name,"gotlockB",time.ctime()) time.sleep(2) lockA.acquire() print(self.name,"gotlockA",time.ctime()) lockA.release() lockB.release() def run(self): self.doA() self.doB() if __name__=="__main__": lockA=threading.Lock() lockB=threading.Lock() threads=[] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join()#等待线程结束,后面再讲。
解决办法:使用递归锁,将
lockA=threading.Lock() lockB=threading.Lock()<br>#--------------<br>lock=threading.RLock()
为了支持在同一线程中屡次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源能够被屡次acquire。直到一个线程全部的acquire都被release,其余的线程才能得到资源。
import time import threading class Account: def __init__(self, _id, balance): self.id = _id self.balance = balance self.lock = threading.RLock() def withdraw(self, amount): with self.lock: self.balance -= amount def deposit(self, amount): with self.lock: self.balance += amount def drawcash(self, amount):#lock.acquire中嵌套lock.acquire的场景 with self.lock: interest=0.05 count=amount+amount*interest self.withdraw(count) def transfer(_from, to, amount): #锁不能够加在这里 由于其余的其它线程执行的其它方法在不加锁的状况下数据一样是不安全的 _from.withdraw(amount) to.deposit(amount) alex = Account('alex',1000) yuan = Account('yuan',1000) t1=threading.Thread(target = transfer, args = (alex,yuan, 100)) t1.start() t2=threading.Thread(target = transfer, args = (yuan,alex, 200)) t2.start() t1.join() t2.join() print('>>>',alex.balance) print('>>>',yuan.balance)
python线程的事件用于主线程控制其余线程的执行,事件主要提供了三个方法wait、clear、set.
事件处理的机制:全局定义了一个“Flag”,若是“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,若是“Flag”值为True,那么event.wait 方法时便再也不阻塞。
clear:将“Flag”设置为False
set:将“Flag”设置为True
用 threading.Event 实现线程间通讯。
import threading,time class Boss(threading.Thread): def run(self): print("BOSS:今晚你们都要加班到22:00。") print(event.isSet()) event.set() time.sleep(5) print("BOSS:<22:00>能够下班了。") print(event.isSet()) event.set() class Worker(threading.Thread): def run(self): event.wait() print("Worker:哎……命苦啊!") time.sleep(1) event.clear() event.wait() print("Worker:OhYeah!") if __name__=="__main__": event=threading.Event() threads=[] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join()
若是一个线程须要不停重复的使用event对象,最好使用condition对象实现一个周期定时器,每当定时器超时的时候,其余线程均可以检测到:
当小伙伴a在往火锅里面添加鱼丸,这个就是生产者行为;另一个小伙伴b在吃掉鱼丸就是消费者行为。当火锅里面鱼丸达到必定数量加满后b才能吃,这就是一种条件判断了。
Condition(条件变量)一般与一个锁关联。须要在多个Contidion中共享一个锁时,能够传递一个Lock/RLock实例给构造方法,不然它将本身生成一个RLock实例。
能够认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态,直到另外一个线程调用notify()/notifyAll()通知;获得通知后线程进入锁定池等待锁定。
Condition():
实现场景:当a同窗王火锅里面添加鱼丸加满后(最多5个,加满后通知b去吃掉),通知b同窗去吃掉鱼丸(吃到0的时候通知a同窗继续添加)
# coding=utf-8 import threading import time con = threading.Condition() num = 0 # 生产者 class Producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): # 锁定线程 global num con.acquire() while True: print("开始添加!!!") num += 1 print("火锅里面鱼丸个数:%s" % str(num)) time.sleep(1) if num >= 5: print("火锅里面里面鱼丸数量已经到达5个,没法添加了!") # 唤醒等待的线程 con.notify() # 唤醒小伙伴开吃啦 # 等待通知 con.wait() #当while退出则开锁。此程序中运行不到这句话 con.release() # 消费者 class Consumers(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): con.acquire() global num while True: print("开始吃啦!!!") num -= 1 print("火锅里面剩余鱼丸数量:%s" %str(num)) time.sleep(2) if num <= 0: print("锅底没货了,赶忙加鱼丸吧!") con.notify() # 唤醒其它线程 # 等待通知 con.wait() # 当while退出则开锁。此程序中运行不到这句话 con.release() p = Producer() c = Consumers() p.start() c.start()
信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。
计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其余线程调用release()。(相似于停车位的概念)
BoundedSemaphore与Semaphore的惟一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,若是超过了将抛出一个异常。
import threading,time class myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) # getname() 线程的名称 time.sleep(5) semaphore.release() if __name__=="__main__": semaphore=threading.Semaphore(5) thrs=[] for i in range(100): thrs.append(myThread()) for t in thrs: t.start()
定时器,指定n秒后执行某操做
from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed
import threading,time li=[1,2,3,4,5] def pri(): while li: a=li[-1] print(a) time.sleep(1) try: li.remove(a) except Exception as e: print('----',a,e) t1=threading.Thread(target=pri,args=()) t1.start() t2=threading.Thread(target=pri,args=()) t2.start()
多线程进行操做时,li[-1]有可能都表明5,而不是递减的。
思考:如何经过对列来完成上述功能?
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
建立一个“队列”对象 import queue q = queue.Queue(maxsize = 10) queue.Queue类便是一个队列的同步实现。队列长度可为无限或者有限。可经过Queue的构造函数的可选参数maxsize来设定队列长度。若是maxsize小于1就表示队列长度无限。 将一个值放入队列中 q.put(10) 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为 1。若是队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。若是block为0,put方法将引起Full异常。 将一个值从队列中取出 q.get() 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。若是队列为空且block为True, get()就使调用线程暂停,直至有项目可用。若是队列为空且block为False,队列将引起Empty异常。 Python queue模块有三种队列及构造函数: 一、Python queue模块的FIFO队列先进先出。 class queue.Queue(maxsize) 二、LIFO相似于堆,即先进后出。 class queue.LifoQueue(maxsize) 三、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) 此包中的经常使用方法(q = queue.Queue()): q.qsize() 返回队列的大小 q.empty() 若是队列为空,返回True,反之False q.full() 若是队列满了,返回True,反之False q.full 与 maxsize 大小对应 q.get([block[, timeout]]) 获取队列,timeout等待时间 q.get_nowait() 至关q.get(False) 非阻塞 q.put(item) 写入队列,timeout等待时间 q.put_nowait(item) 至关q.put(item, False) q.task_done() 在完成一项工做以后,q.task_done() 函数向任务已经完成的队列发送一个信号 q.join() 实际上意味着等到队列为空,再执行别的操做
other mode:
import queue #先进后出 q=queue.LifoQueue() q.put(34) q.put(56) q.put(12) #优先级 # q=queue.PriorityQueue() # q.put([5,100]) # q.put([7,200]) # q.put([3,"hello"]) # q.put([4,{"name":"alex"}]) while 1: data=q.get() print(data)
生产者消费者模型:
为何要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。
这就像,在餐厅,厨师作好菜,不须要直接和客户交流,而是交给前台,而客户去饭菜也不须要不找厨师,直接去前台领取便可,这也是一个结耦的过程。
import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <10: print("making........") time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1 #q.task_done() #q.join() print("ok......") def Consumer(name): count = 0 while count <10: time.sleep(random.randrange(4)) if not q.empty(): data = q.get() #q.task_done() #q.join() print(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) else: print("-----no baozi anymore----") count +=1 p1 = threading.Thread(target=Producer, args=('A',)) c1 = threading.Thread(target=Consumer, args=('B',)) # c2 = threading.Thread(target=Consumer, args=('C',)) # c3 = threading.Thread(target=Consumer, args=('D',)) p1.start() c1.start() # c2.start() # c3.start()
import threading,queue class Mythread1(threading.Thread): def __init__(self): super().__init__() def run(self): n = 100 while n>0: n -= 1 print("put进n") q.put(n) class Mythread2(threading.Thread): def __init__(self): super().__init__() def run(self): n = 100 while n>0: n -= 1 print("get出n") q.get(n) if __name__ == '__main__': q = queue.Queue() t1 = Mythread1() t2 = Mythread2() t1.start() t2.start() >>>代码会出现以下结果 put进n put进n put进n put进nget出n get出n get出nput进n get出n put进n put进n put进n put进n put进n put进n get出nput进n get出n get出n get出n put进nget出n get出n get出n
暂时不知道为何。(估计是print的时候自动换行出问题)可是有一个解决方法。加锁。
#! /usr/bin/env python # -*- coding: utf-8 -*- # __author__ = "Q1mi" # Date: 2018/7/12 import threading, queue class Mythread1(threading.Thread): def __init__(self): super().__init__() def run(self): n = 1000 while n > 0: n -= 1 lock.acquire() print("put进n", end="\n") lock.release() q.put(n) class Mythread2(threading.Thread): def __init__(self): super().__init__() def run(self): n = 1000 while n > 0: n -= 1 lock.acquire() print("get出n", end="\n") lock.release() q.get(n) if __name__ == '__main__': lock = threading.Lock() q = queue.Queue() t1 = Mythread1() t2 = Mythread2() t1.start() t2.start()
多进程模块 multiprocessing
M
is a package that supports spawning processes using an API similar to the threading module. The ultiprocessing
package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.multiprocessing
因为GIL的存在,python中的多线程其实并非真正的多线程,若是想要充分地使用多核CPU的资源,在python中大部分状况须要使用多进程。
multiprocessing包是Python中的多进程管理包。与threading.Thread相似,它能够利用multiprocessing.Process对象来建立一个进程。该进程能够运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象能够像多线程那样,经过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。因此,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。
from multiprocessing import Process import time def f(name): time.sleep(1) print('hello', name,time.ctime()) if __name__ == '__main__': p_list=[] for i in range(3): p = Process(target=f, args=('alvin',)) p_list.append(p) p.start() for i in p_list: p.join() print('end')
from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() #self.name = name def run(self): time.sleep(1) print ('hello', self.name,time.ctime()) if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end')
To show the individual process IDs involved, here is an expanded example:
from multiprocessing import Process import os import time def info(title): print("title:",title) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main process line') time.sleep(1) print("------------------") p = Process(target=info, args=('yuan',)) p.start() p.join()
构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组,目前尚未实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。
实例方法:
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,若是实例进程时未制定传入target,这star执行t默认run()方法。
terminate():无论任务是否完成,当即中止工做进程
属性
daemon:和线程的setDeamon功能同样
name:进程名字。
pid:进程号。
import time from multiprocessing import Process def foo(i): time.sleep(1) print (p.is_alive(),i,p.pid) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(10): p = Process(target=foo, args=(i,)) #p.daemon=True p_list.append(p) for p in p_list: p.start() # for p in p_list: # p.join() print('main process end')
from multiprocessing import Process, Queue import queue def f(q,n): #q.put([123, 456, 'hello']) q.put(n*n+1) print("son process",id(q)) if __name__ == '__main__': q = Queue() #try: q=queue.Queue() print("main process",id(q)) for i in range(3): p = Process(target=f, args=(q,i)) p.start() print(q.get()) print(q.get()) print(q.get())
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
from multiprocessing import Process, Pipe def f(conn): conn.send([12, {"name":"yuan"}, 'hello']) response=conn.recv() print("response",response) conn.close() print("q_ID2:",id(child_conn)) if __name__ == '__main__': parent_conn, child_conn = Pipe() print("q_ID1:",id(child_conn)) p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" parent_conn.send("儿子你好!") p.join()
The two connection objects returned by Pipe()
represent the two ends of the pipe. Each connection object has send()
and recv()
methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing # 管道消费者. def consumer(lock,pipe): output_p, input_p = pipe input_p.close() # 关闭管道输入口 while True: lock.acquire() item = output_p.recv() lock.release() if item == None: break # 处理部分 lock.acquire() print(item) lock.release() # 管道生产者 def producer(sequence, input_p): for item in sequence: # Put the item on the queue input_p.send(item) if __name__ == '__main__': # 进程数、建立管道,锁等 p_num = 2 process = [] (output_p, input_p) = multiprocessing.Pipe() lock = multiprocessing.Lock() # 定义消费进程 for i in range(p_num): t =multiprocessing.Process(target=consumer,args=(lock,(output_p, input_p),)) t.daemon=True process.append(t) # 启动消费进程 for i in range(p_num): process[i].start() # 关闭输出管道,以往管道填充数据 output_p.close() sequence = range(100) + [None]*p_num producer(sequence, input_p) # 数据填充完毕,打开输入管道 input_p.close() # 等待结束 for i in range(p_num): process[i].join()
Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另外一个进程的数据。
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
. For example:
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.append(1) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
Without using the lock output from the different processes is liable to get all mixed up.
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
python中,进程池内部会维护一个进程序列。当须要时,程序会去进程池中获取一个进程。若是进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
同步是指一个进程在执行某个请求的时候,必需要到收到对方返回的信息才继续执行下去。
异步是指进程在执行某个请求时,无论其余的进程的状态,这个进程就执行后续操做;
当有消息返回时系统会通知进程进行处理,这样能够提升执行的效率。例如:打电话就是同步通讯,发信息就是异步通讯。
代码以下:
from multiprocessing import Pool
import time
def func(args):
time.sleep(1) #程序休眠1s
print("%s------>%s"%(args,time.ctime())) #打印参数及时间
if __name__=="__main__":
p1=Pool(2) #设定开启2个进程池
for i in range(10):
p1.apply_async(func=func,args=(i,)) #设定异步执行任务
p1.close() #关闭进程池
time.sleep(2) #程序休眠2s
p1.terminate() #关闭进程池
p1.join() #阻塞进程池
print("ending") #打印结束语句
>>>
0------>Thu Jul 20 20:18:43 2017
1------>Thu Jul 20 20:18:43 2017
ending
能够看到,在程序执行过程当中,关闭进程池,则程序会当即中止,不会再继续执行后续语句。
代码以下:
from multiprocessing import Pool import time def func(args): time.sleep(1) #休眠1s print("%s------>%s"%(args,time.ctime())) #打印传递的参数及时间 if __name__=="__main__": p1=Pool(2) #定义2个进程池 for i in range(10): #定义循环10次 p1.apply_async(func=func,args=(i,)) #异步执行任务 p1.close() #等待全部的任务都完成才关闭进程池 p1.join() print("ending")
执行结果以下:
0------>Thu Jul 20 20:19:12 2017
1------>Thu Jul 20 20:19:12 2017
2------>Thu Jul 20 20:19:13 2017
3------>Thu Jul 20 20:19:13 2017
4------>Thu Jul 20 20:19:14 2017
5------>Thu Jul 20 20:19:14 2017
6------>Thu Jul 20 20:19:15 2017
7------>Thu Jul 20 20:19:15 2017
8------>Thu Jul 20 20:19:16 2017
9------>Thu Jul 20 20:19:16 2017
ending
参考文档:
1. https://www.cnblogs.com/huanxiyun/articles/5826902.html
2. http://www.javashuo.com/article/p-xkqrdjgb-c.html
上下文管理器的任务是:代码块执行前准备,代码块执行后收拾
如何打开一个文件,并写入"hello world"
filename="my.txt" mode="w" f=open(filename,mode) f.write("hello world") f.close()
当发生异常时(如磁盘写满),就没有机会执行第5行。固然,咱们能够采用try-finally语句块进行包装:
writer=open(filename,mode) try: writer.write("hello world") finally: writer.close()
当咱们进行复杂的操做时,try-finally语句就会变得丑陋,采用with语句重写:
with open(filename,mode) as writer: writer.write("hello world")
as指代了从open()函数返回的内容,并把它赋给了新值。with完成了try-finally的任务。
with语句的做用相似于try-finally,提供一种上下文机制。要应用with语句的类,其内部必须提供两个内置函数__enter__和__exit__。前者在主体代码执行前执行,后者在主体代码执行后执行。as后面的变量,是在__enter__函数中返回的。
class echo(): def output(self): print "hello world" def __enter__(self): print "enter" return self #能够返回任何但愿返回的东西 def __exit__(self,exception_type,value,trackback): print "exit" if exception_type==ValueError: return True else: return Flase >>>with echo as e: e.output() 输出: enter hello world exit
完备的__exit__函数以下:
def __exit__(self,exc_type,exc_value,exc_tb)
其中,exc_type:异常类型;exc_value:异常值;exc_tb:异常追踪信息
当__exit__返回True时,异常不传播
contextlib模块的做用是提供更易用的上下文管理器,它是经过Generator实现的。contextlib中的contextmanager做为装饰器来提供一种针对函数级别的上下文管理机制,经常使用框架以下:
from contextlib import contextmanager @contextmanager def make_context(): print 'enter' try: yield "ok" except RuntimeError,err: print 'error',err finally: print 'exit' >>>with make_context() as value: print value 输出为: enter ok exit
其中,yield写入try-finally中是为了保证异常安全(能处理异常)as后的变量的值是由yield返回。yield前面的语句可看做代码块执行前操做,yield以后的操做能够看做在__exit__函数中的操做。
以线程锁为例:
@contextlib.contextmanager def loudLock(): print 'Locking' lock.acquire() yield print 'Releasing' lock.release() with loudLock(): print 'Lock is locked: %s' % lock.locked() print 'Doing something that needs locking' #Output: #Locking #Lock is locked: True #Doing something that needs locking #Releasing
对于:
with open(filename,mode) as reader:
with open(filename1,mode1) as writer:
writer.write(reader.read())
能够经过contextlib.nested进行简化:
with contextlib.nested(open(filename,mode),open(filename1,mode1)) as (reader,writer):
writer.write(reader.read())
在python 2.7及之后,被一种新的语法取代:
with open(filename,mode) as reader,open(filename1,mode1) as writer:
writer.write(reader.read())
file类直接支持上下文管理器API,但有些表示打开句柄的对象并不支持,如urllib.urlopen()返回的对象。还有些遗留类,使用close()方法而不支持上下文管理器API。为了确保关闭句柄,须要使用closing()为它建立一个上下文管理器(调用类的close方法)。
import contextlib class myclass(): def __init__(self): print '__init__' def close(self): print 'close()' with contextlib.closing(myclass()): print 'ok'
>>>
__init__
ok
close()
协程,又称微线程,纤程。英文名Coroutine。
优势1: 协程极高的执行效率。由于子程序切换不是线程切换,而是由程序自身控制,所以,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优点就越明显。
优势2: 不须要多线程的锁机制,由于只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只须要判断状态就行了,因此执行效率比多线程高不少。
由于协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可得到极高的性能。
import time import queue def consumer(name): print("--->ready to eat baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name,new_baozi)) #time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while 1: time.sleep(1) print("\033[32;1m[producer]\033[0m is making baozi %s and %s" %(n,n+1) ) con.send(n) con2.send(n+1) n +=2 if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
greenlet是一个用C实现的协程模块,相比与python自带的yield,它可使你在任意函数之间随意切换,而不需把这个函数先声明为generator
from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
import gevent import requests,time start=time.time() def f(url): print('GET: %s' % url) resp =requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://www.baidu.com/'), gevent.spawn(f, 'https://www.sina.com.cn/'), ]) # f('https://www.python.org/') # # f('https://www.yahoo.com/') # # f('https://baidu.com/') # # f('https://www.sina.com.cn/') print("cost time:",time.time()-start)
更多内容参考: