进程定义:python
进程就是一个程序在一个数据集上的一次动态执行过程。进程通常由程序、数据集、进程控制块三部分组成。咱们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程当中所须要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统能够利用它来控制和管理进程,它是系统感知进程存在的惟一标志。react
线程的出现是为了下降上下文切换的消耗,提升系统的并发性,并突破一个进程只能干同样事的缺陷,使到进程内并发成为可能。linux
线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程当中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。线程的引入减少了程序并发执行时的开销,提升了操做系统的并发性能。线程没有本身的系统资源。git
进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操做系统结构的基础。或者说进程是具备必定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。
线程则是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。程序员
进程和线程的关系:github
(1)一个线程只能属于一个进程,而一个进程能够有多个线程,但至少有一个线程。
(2)资源分配给进程,同一进程的全部线程共享该进程的全部资源。
(3)CPU分给线程,即真正在CPU上运行的是线程。web
并行处理(Parallel Processing)是计算机系统中能同时执行两个或更多个处理的一种计算方法。并行处理可同时工做于同一程序的不一样方面。并行处理的主要目的是节省大型和复杂问题的解决时间。并发处理(concurrency Processing):指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机(CPU)上运行,但任一个时刻点上只有一个程序在处理机(CPU)上运行redis
并发的关键是你有处理多个任务的能力,不必定要同时。并行的关键是你有同时处理多个任务的能力。因此说,并行是并发的子集编程
在计算机领域,同步就是指一个进程在执行某个请求的时候,若该请求须要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去;异步是指进程不须要一直等下去,而是继续执行下面的操做,无论其余进程的状态。当有消息返回时系统会通知进程进行处理,这样能够提升执行的效率。举个例子,打电话时就是同步通讯,发短息时就是异步通讯。api
import threading import time def countNum(n): # 定义某个线程要运行的函数 print("running on number:%s" %n) time.sleep(3) if __name__ == '__main__': t1 = threading.Thread(target=countNum,args=(23,)) #生成一个线程实例 t2 = threading.Thread(target=countNum,args=(34,)) t1.start() #启动线程 t2.start() print("ending!")
#继承Thread式建立 import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num=num def run(self): print("running on number:%s" %self.num) time.sleep(3) t1=MyThread(56) t2=MyThread(78) t1.start() t2.start() print("ending")
# join():在子线程完成运行以前,这个子线程的父线程将一直被阻塞。 # setDaemon(True): ''' 将线程声明为守护线程,必须在start() 方法调用以前设置,若是不设置为守护线程程序会被无限挂起。 当咱们在程序运行中,执行一个主线程,若是主线程又建立一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成 想退出时,会检验子线程是否完成。若是子线程未完成,则主线程会等待子线程完成后再退出。可是有时候咱们须要的是只要主线程 完成了,无论子线程是否完成,都要和主线程一块儿退出,这时就能够 用setDaemon方法啦''' import threading from time import ctime,sleep import time def Music(name): print ("Begin listening to {name}. {time}".format(name=name,time=ctime())) sleep(3) print("end listening {time}".format(time=ctime())) def Blog(title): print ("Begin recording the {title}. {time}".format(title=title,time=ctime())) sleep(5) print('end recording {time}'.format(time=ctime())) threads = [] t1 = threading.Thread(target=Music,args=('FILL ME',)) t2 = threading.Thread(target=Blog,args=('',)) threads.append(t1) threads.append(t2) if __name__ == '__main__': #t2.setDaemon(True) for t in threads: #t.setDaemon(True) #注意:必定在start以前设置 t.start() #t.join() #t1.join() #t2.join() # 考虑这三种join位置下的结果? print ("all over %s" %ctime())
1 daemon 2 A boolean value indicating whether this thread is a daemon thread (True) or not (False). This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False. 3 4 The entire Python program exits when no alive non-daemon threads are left. 5 6 当daemon被设置为True时,若是主线程退出,那么子线程也将跟着退出, 7 8 反之,子线程将继续运行,直到正常退出。
Thread实例对象的方法
# isAlive(): 返回线程是否活动的。 # getName(): 返回线程名。 # setName(): 设置线程名。 threading模块提供的一些方法: # threading.currentThread(): 返回当前的线程变量。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
'''
定义: In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) '''
Python中的线程是操做系统的原生线程,Python虚拟机使用一个全局解释器锁(Global Interpreter Lock)来互斥线程对Python虚拟机的使用。为了支持多线程机制,一个基本的要求就是须要实现不一样线程对共享资源访问的互斥,因此引入了GIL。
GIL:在一个线程拥有了解释器的访问权以后,其余的全部线程都必须等待它释放解释器的访问权,即便这些线程的下一条指令并不会互相影响。
在调用任何Python C API以前,要先得到GIL
GIL缺点:多处理器退化为单处理器;优势:避免大量的加锁解锁操做
Python支持多线程,而解决多线程之间数据完整性和状态同步的最简单方法天然就是加锁。 因而有了GIL这把超级大锁,而当愈来愈多的代码库开发者接受了这种设定后,他们开始大量依赖这种特性(即默认python内部对象是thread-safe的,无需在实现时考虑额外的内存锁和同步操做)。慢慢的这种实现方式被发现是蛋疼且低效的。但当你们试图去拆分和去除GIL的时候,发现大量库代码开发者已经重度依赖GIL而很是难以去除了。有多难?作个类比,像MySQL这样的“小项目”为了把Buffer Pool Mutex这把大锁拆分红各个小锁也花了从5.5到5.6再到5.7多个大版为期近5年的时间,而且仍在继续。MySQL这个背后有公司支持且有固定开发团队的产品走的如此艰难,那又更况且Python这样核心开发和代码贡献者高度社区化的团队呢?
不管你启多少个线程,你有多少个cpu, Python在执行一个进程的时候会淡定的在同一时刻只容许一个线程运行。
因此,python是没法利用多核CPU实现多线程的。
这样,python对于计算密集型的任务开多线程的效率甚至不如串行(没有大量切换),可是,对于IO密集型的任务效率仍是有显著提高的。
计算密集型:
1 #coding:utf8 2 from threading import Thread 3 import time 4 5 def counter(): 6 i = 0 7 for _ in range(50000000): 8 i = i + 1 9 10 return True 11 12 13 def main(): 14 15 l=[] 16 start_time = time.time() 17 18 for i in range(2): 19 20 t = Thread(target=counter) 21 t.start() 22 l.append(t) 23 t.join() 24 25 # for t in l: 26 # t.join() 27 28 end_time = time.time() 29 print("Total time: {}".format(end_time - start_time)) 30 31 if __name__ == '__main__': 32 main() 33 34 35 ''' 36 py2.7: 37 串行:25.4523348808s 38 并发:31.4084379673s 39 py3.5: 40 串行:8.62115597724914s 41 并发:8.99609899520874s 42 43 '''
用multiprocessing替代Thread multiprocessing库的出现很大程度上是为了弥补thread库由于GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。惟一的不一样就是它使用了多进程而不是多线程。每一个进程有本身的独立的GIL,所以也不会出现进程之间的GIL争抢。
1 #coding:utf8 2 from multiprocessing import Process 3 import time 4 5 def counter(): 6 i = 0 7 for _ in range(40000000): 8 i = i + 1 9 10 return True 11 12 def main(): 13 14 l=[] 15 start_time = time.time() 16 17 for _ in range(2): 18 t=Process(target=counter) 19 t.start() 20 l.append(t) 21 #t.join() 22 23 for t in l: 24 t.join() 25 26 end_time = time.time() 27 print("Total time: {}".format(end_time - start_time)) 28 29 if __name__ == '__main__': 30 main() 31 32 33 ''' 34 35 py2.7: 36 串行:6.1565990448 s 37 并行:3.1639978885 s 38 39 py3.5: 40 串行:6.556925058364868 s 41 并发:3.5378448963165283 s 42 43 '''
固然multiprocessing也不是万能良药。它的引入会增长程序实现时线程间数据通信和同步的困难。就拿计数器来举例子,若是咱们要多个线程累加同一个变量,对于thread来讲,申明一个global变量,用thread.Lock的context包裹住三行就搞定了。而multiprocessing因为进程之间没法看到对方的数据,只能经过在主线程申明一个Queue,put再get或者用share memory的方法。这个额外的实现成本使得原本就很是痛苦的多线程程序编码,变得更加痛苦了。
总结:由于GIL的存在,只有IO Bound场景下得多线程会获得较好的性能 - 若是对并行计算性能较高的程序能够考虑把核心部分也成C模块,或者索性用其余语言实现 - GIL在较长一段时间内将会继续存在,可是会不断对其进行改进。
因此对于GIL,既然不能改变,那就学会去适应它吧!
import time import threading def addNum(): global num #在每一个线程中都获取这个全局变量 #num-=1 temp=num time.sleep(0.1) num =temp-1 # 对此公共变量进行-1操做 num = 100 #设定一个共享变量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待全部线程执行完毕 t.join() print('Result: ', num)
锁一般被用来实现对共享资源的同步访问。为每个共享资源建立一个Lock对象,当你须要访问该资源时,调用acquire方法来获取锁对象(若是其它线程已经得到了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁:
import threading R=threading.Lock() R.acquire() ''' 对公共数据的操做 ''' R.release()
思考
'''
一、为何有了GIL,还须要线程同步?
多线程环境下必须存在资源的竞争,那么如何才能保证同一时刻只有一个线程对共享资源进行存取?
加锁, 对, 加锁能够保证存取操做的惟一性, 从而保证同一时刻只有一个线程对共享数据存取.
一般加锁也有2种不一样的粒度的锁:
coarse-grained(粗粒度): python解释器层面维护着一个全局的锁机制,用来保证线程安全。
内核级经过GIL实现的互斥保护了内核的共享资源。
fine-grained(细粒度): 那么程序员须要自行地加,解锁来保证线程安全,
用户级经过自行加锁保护的用户程序的共享资源。
二、GIL为何限定在一个进程上?
你写一个py程序,运行起来自己就是一个进程,这个进程是有解释器来翻译的,因此GIL限定在当前进程;
若是又建立了一个子进程,那么两个进程是彻底独立的,这个字进程也是有python解释器来运行的,因此
这个子进程上也是受GIL影响的
'''
所谓死锁: 是指两个或两个以上的进程或线程在执行过程当中,因争夺资源而形成的一种互相等待的现象,若无外力做用,它们都将没法推动下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
import threading import time mutexA = threading.Lock() mutexB = threading.Lock() class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): self.fun1() self.fun2() def fun1(self): mutexA.acquire() # 若是锁被占用,则阻塞在这里,等待锁的释放 print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutexB.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) mutexB.release() mutexA.release() def fun2(self): mutexB.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) time.sleep(0.2) mutexA.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutexA.release() mutexB.release() if __name__ == "__main__": print("start---------------------------%s"%time.time()) for i in range(0, 10): my_thread = MyThread() my_thread.start()
在Python中为了支持在同一线程中屡次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源能够被屡次require。直到一个线程全部的acquire都被release,其余的线程才能得到资源。上面的例子若是使用RLock代替Lock,则不会发生死锁:
1
|
mutex
=
threading.RLock()
|
线程的一个关键特性是每一个线程都是独立运行且状态不可预测。若是程序中的其 他线程须要经过判断某个线程的状态来肯定本身下一步的操做,这时线程同步问题就 会变得很是棘手。为了解决这些问题,咱们须要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它容许线程等待某些事件的发生。在 初始状况下,Event对象中的信号标志被设置为假。若是有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程若是将一个Event对象的信号标志设置为真,它将唤醒全部等待这个Event对象的线程。若是一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
event.isSet():返回event的状态值; event.wait():若是 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,全部阻塞池的线程激活进入就绪状态, 等待操做系统调度; event.clear():恢复event的状态值为False。
能够考虑一种应用场景(仅仅做为说明),例如,咱们有多个线程从Redis队列中读取数据来处理,这些线程都要尝试去链接Redis的服务,通常状况下,若是Redis链接不成功,在各个线程的代码中,都会去尝试从新链接。若是咱们想要在启动时确保Redis服务正常,才让那些工做线程去链接Redis服务器,那么咱们就能够采用threading.Event机制来协调各个工做线程的链接操做:主线程中会去尝试链接Redis服务,若是正常的话,触发事件,各工做线程会尝试链接Redis服务。
import threading import time import logging logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',) def worker(event): logging.debug('Waiting for redis ready...') event.wait() logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime()) time.sleep(1) def main(): readis_ready = threading.Event() t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1') t1.start() t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2') t2.start() logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event') time.sleep(3) # simulate the check progress readis_ready.set() if __name__=="__main__": main()
threading.Event的wait方法还接受一个超时参数,默认状况下若是事件一致没有发生,wait方法会一直阻塞下去,而加入这个超时参数以后,若是阻塞时间超过这个参数设定的值以后,wait方法会返回。对应于上面的应用场景,若是Redis服务器一致没有启动,咱们但愿子线程可以打印一些日志来不断地提醒咱们当前没有一个能够链接的Redis服务,咱们就能够经过设置这个超时参数来达成这样的目的:
def worker(event): while not event.is_set(): logging.debug('Waiting for redis ready...') event.wait(2) logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime()) time.sleep(1)
这样,咱们就能够在等待Redis服务启动的同时,看到工做线程里正在等待的状况。
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其余线程调用release()。
实例:(同时只有5个线程能够得到semaphore,便可以限制最大链接数为5):
import threading import time semaphore = threading.Semaphore(5) def func(): if semaphore.acquire(): print (threading.currentThread().getName() + ' get semaphore') time.sleep(2) semaphore.release() for i in range(20): t1 = threading.Thread(target=func) t1.start()
应用:链接池
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
''' 建立一个“队列”对象 import Queue q = Queue.Queue(maxsize = 10) Queue.Queue类便是一个队列的同步实现。队列长度可为无限或者有限。可经过Queue的构造函数的可选参数 maxsize来设定队列长度。若是maxsize小于1就表示队列长度无限。 将一个值放入队列中 q.put(10) 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值; 第二个block为可选参数,默认为 1。若是队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。若是block为0, put方法将引起Full异常。 将一个值从队列中取出 q.get() 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。若是队列为空且 block为True,get()就使调用线程暂停,直至有项目可用。若是队列为空且block为False,队列将引起Empty异常。 '''
''' join() 阻塞进程,直到全部任务完成,须要配合另外一个方法task_done。 def join(self): with self.all_tasks_done: while self.unfinished_tasks: self.all_tasks_done.wait() task_done() 表示某个任务完成。每一条get语句后须要一条task_done。 import queue q = queue.Queue(5) q.put(10) q.put(20) print(q.get()) q.task_done() print(q.get()) q.task_done() q.join() print("ending!") '''
''' 此包中的经常使用方法(q = Queue.Queue()):
q.qsize() 返回队列的大小 q.empty() 若是队列为空,返回True,反之False q.full() 若是队列满了,返回True,反之False q.full 与 maxsize 大小对应 q.get([block[, timeout]]) 获取队列,timeout等待时间 q.get_nowait() 至关q.get(False)非阻塞
q.put(item) 写入队列,timeout等待时间 q.put_nowait(item) 至关q.put(item, False) q.task_done() 在完成一项工做以后,q.task_done() 函数向任务已经完成的队列发送一个信号 q.join() 实际上意味着等到队列为空,再执行别的操做 '''
''' Python Queue模块有三种队列及构造函数: 一、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize) 二、LIFO相似于堆,即先进后出。 class queue.LifoQueue(maxsize) 三、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) import queue #先进后出 q=queue.LifoQueue() q.put(34) q.put(56) q.put(12) #优先级 q=queue.PriorityQueue() q.put([5,100]) q.put([7,200]) q.put([3,"hello"]) q.put([4,{"name":"alex"}]) while 1: data=q.get() print(data) '''
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。
生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。
这就像,在餐厅,厨师作好菜,不须要直接和客户交流,而是交给前台,而客户去饭菜也不须要不找厨师,直接去前台领取便可,这也是一个结耦的过程。
import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <10: print("making........") time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1 #q.task_done() #q.join() print("ok......") def Consumer(name): count = 0 while count <10: time.sleep(random.randrange(4)) if not q.empty(): data = q.get() #q.task_done() #q.join() print(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) else: print("-----no baozi anymore----") count +=1 p1 = threading.Thread(target=Producer, args=('A',)) c1 = threading.Thread(target=Consumer, args=('B',)) # c2 = threading.Thread(target=Consumer, args=('C',)) # c3 = threading.Thread(target=Consumer, args=('D',)) p1.start() c1.start() # c2.start() # c3.start()
Multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.
因为GIL的存在,python中的多线程其实并非真正的多线程,若是想要充分地使用多核CPU的资源,在python中大部分状况须要使用多进程。
multiprocessing包是Python中的多进程管理包。与threading.Thread相似,它能够利用multiprocessing.Process对象来建立一个进程。该进程能够运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象能够像多线程那样,经过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。因此,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。
# Process类调用 from multiprocessing import Process import time def f(name): print('hello', name,time.ctime()) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(3): p = Process(target=f, args=('alvin:%s'%i,)) p_list.append(p) p.start() for i in p_list: p.join() print('end') # 继承Process类调用 from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() # self.name = name def run(self): print ('hello', self.name,time.ctime()) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end')
构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组,目前尚未实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。
实例方法:
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,若是实例进程时未制定传入target,这star执行t默认run()方法。
terminate():无论任务是否完成,当即中止工做进程
属性:
daemon:和线程的setDeamon功能同样
name:进程名字。
pid:进程号。
from multiprocessing import Process import os import time def info(name): print("name:",name) print('parent process:', os.getppid()) print('process id:', os.getpid()) print("------------------") time.sleep(1) def foo(name): info(name) if __name__ == '__main__': info('main process line') p1 = Process(target=info, args=('alvin',)) p2 = Process(target=foo, args=('egon',)) p1.start() p2.start() p1.join() p2.join() print("ending")
经过tasklist(Win)或者ps -elf |grep(linux)命令检测每个进程号(PID)对应的进程名
from multiprocessing import Process, Queue import queue def f(q,n): #q.put([123, 456, 'hello']) q.put(n*n+1) print("son process",id(q)) if __name__ == '__main__': q = Queue() #try: q=queue.Queue() print("main process",id(q)) for i in range(3): p = Process(target=f, args=(q,i)) p.start() print(q.get()) print(q.get()) print(q.get())
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
from multiprocessing import Process, Pipe def f(conn): conn.send([12, {"name":"yuan"}, 'hello']) response=conn.recv() print("response",response) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" parent_conn.send("儿子你好!") p.join()
Pipe()返回的两个链接对象表明管道的两端。 每一个链接对象都有send()和recv()方法(等等)。 请注意,若是两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。
Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另外一个进程的数据。
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
from multiprocessing import Process, Manager def f(d, l,n): d[n] = n d["name"] ="alvin" l.append(n) #print("l",l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d,l,i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,若是进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
from multiprocessing import Pool import time def foo(args): time.sleep(1) print(args) if __name__ == '__main__': p = Pool(5) for i in range(30): p.apply_async(func=foo, args= (i,)) p.close() # 等子进程执行完毕后关闭线程池 # time.sleep(2) # p.terminate() # 马上关闭线程池 p.join()
进程池内部维护一个进程序列,当使用时,去进程池中获取一个进程,若是进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有如下几个主要方法:
协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。
协程拥有本身的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。所以:
协程能保留上一次调用时的状态(即全部局部状态的一个特定组合),每次过程重入时,就至关于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
import time """ 传统的生产者-消费者模型是一个线程写消息,一个线程取消息,经过锁机制控制队列和等待,但一不当心就可能死锁。 若是改用协程,生产者生产消息后,直接经过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。 """ # 注意到consumer函数是一个generator(生成器): # 任何包含yield关键字的函数都会自动成为生成器(generator)对象 def consumer(): r = '' while True: # 三、consumer经过yield拿到消息,处理,又经过yield把结果传回; # yield指令具备return关键字的做用。而后函数的堆栈会自动冻结(freeze)在这一行。 # 当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时, # 就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。经过这种方式,迭代器能够实现无限序列和惰性求值。 n = yield r if not n: return print('[CONSUMER] ←← Consuming %s...' % n) time.sleep(1) r = '200 OK' def produce(c): # 一、首先调用c.next()启动生成器 next(c) n = 0 while n < 5: n = n + 1 print('[PRODUCER] →→ Producing %s...' % n) # 二、而后,一旦生产了东西,经过c.send(n)切换到consumer执行; cr = c.send(n) # 四、produce拿到consumer处理的结果,继续生产下一条消息; print('[PRODUCER] Consumer return: %s' % cr) # 五、produce决定不生产了,经过c.close()关闭consumer,整个过程结束。 c.close() if __name__=='__main__': # 六、整个流程无锁,由一个线程执行,produce和consumer协做完成任务,因此称为“协程”,而非线程的抢占式多任务。 c = consumer() produce(c) ''' result: [PRODUCER] →→ Producing 1... [CONSUMER] ←← Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 2... [CONSUMER] ←← Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 3... [CONSUMER] ←← Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 4... [CONSUMER] ←← Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 5... [CONSUMER] ←← Consuming 5... [PRODUCER] Consumer return: 200 OK '''
greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操做进行恢复为止。可使用一个调度器循环在一组生成器函数之间协做多个任务。greentlet是python中实现咱们所谓的"Coroutine(协程)"的一个基础库.
from greenlet import greenlet def test1(): print (12) gr2.switch() print (34) gr2.switch() def test2(): print (56) gr1.switch() print (78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
Python经过yield提供了对协程的基本支持,可是不彻底。而第三方的gevent为Python提供了比较完善的协程支持。
gevent是第三方库,经过greenlet实现协程,其基本思想是:
当一个greenlet遇到IO操做时,好比访问网络,就自动切换到其余的greenlet,等到IO操做完成,再在适当的时候切换回来继续执行。因为IO操做很是耗时,常常使程序处于等待状态,有了gevent为咱们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
因为切换是在IO操做时自动完成,因此gevent须要修改Python自带的一些标准库,这一过程在启动时经过monkey patch完成:
import gevent import time def foo(): print("running in foo") gevent.sleep(2) print("switch to foo again") def bar(): print("switch to bar") gevent.sleep(5) print("switch to bar again") start=time.time() gevent.joinall( [gevent.spawn(foo), gevent.spawn(bar)] ) print(time.time()-start)
固然,实际代码里,咱们不会用gevent.sleep()去切换协程,而是在执行到IO操做时,gevent自动切换,代码以下:
from gevent import monkey
monkey.patch_all() import gevent from urllib import request import time def f(url): print('GET: %s' % url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) start=time.time() gevent.joinall([ gevent.spawn(f, 'https://itk.org/'), gevent.spawn(f, 'https://www.github.com/'), gevent.spawn(f, 'https://zhihu.com/'), ]) # f('https://itk.org/') # f('https://www.github.com/') # f('https://zhihu.com/') print(time.time()-start)
===========================一 gevent是一个基于协程(coroutine)的Python网络函数库,经过使用greenlet提供了一个在libev事件循环顶部的高级别并发API。 主要特性有如下几点: <1> 基于libev的快速事件循环,Linux上面的是epoll机制 <2> 基于greenlet的轻量级执行单元 <3> API复用了Python标准库里的内容 <4> 支持SSL的协做式sockets <5> 可经过线程池或c-ares实现DNS查询 <6> 经过monkey patching功能来使得第三方模块变成协做式 gevent.spawn()方法spawn一些jobs,而后经过gevent.joinall将jobs加入到微线程执行队列中等待其完成,设置超时为2秒。执行后的结果经过检查gevent.Greenlet.value值来收集。 ===========================二 1、关于Linux的epoll机制: epoll是Linux内核为处理大批量文件描述符而做了改进的poll,是Linux下多路复用IO接口select/poll的 加强版本,它能显著提升程序在大量并发链接中只有少许活跃的状况下的系统CPU利用率。epoll的优势: (1)支持一个进程打开大数目的socket描述符。select的一个进程所打开的FD由FD_SETSIZE的设置来限定,而epoll没有这个限制,它所支持的FD上限是 最大可打开文件的数目,远大于2048。 (2)IO效率不随FD数目增长而线性降低:因为epoll只会对“活跃”的socket进行操做,因而,只有”活跃”的socket才会主动去调用 callback函数,其余 idle状态的socket则不会。 (3)使用mmap加速内核与用户空间的消息传递。epoll是经过内核于用户空间mmap同一块内存实现的。 (4)内核微调。 2、libev机制 提供了指定文件描述符事件发生时调用回调函数的机制。libev是一个事件循环器:向libev注册感兴趣的事件,好比socket可读事件,libev会对所注册的事件 的源进行管理,并在事件发生时触发相应的程序。 ===========================三 ‘’‘ import gevent from gevent import socket urls = [‘www.google.com.hk’,’www.example.com’, ‘www.python.org’ ] jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls] gevent.joinall(jobs, timeout=2) [job.value for job in jobs] [‘74.125.128.199’, ‘208.77.188.166’, ‘82.94.164.162’] ’‘’ gevent.spawn()方法spawn一些jobs,而后经过gevent.joinall将jobs加入到微线程执行队列中等待其完成,设置超时为2秒。执行后的结果经过检查gevent.Greenlet.value值来收集。gevent.socket.gethostbyname()函数与标准的socket.gethotbyname()有相同的接口,但它不会阻塞整个解释器,所以会使得其余的greenlets跟随着无阻的请求而执行。 Monket patching Python的运行环境容许咱们在运行时修改大部分的对象,包括模块、类甚至函数。虽然这样作会产生“隐式的反作用”,并且出现问题很难调试,但在须要修改Python自己的基础行为时,Monkey patching就派上用场了。Monkey patching可以使得gevent修改标准库里面大部分的阻塞式系统调用,包括socket,ssl,threading和select等模块,而变成协做式运行。 from gevent import monkey ; monkey . patch_socket () import urllib2 经过monkey.patch_socket()方法,urllib2模块可使用在多微线程环境,达到与gevent共同工做的目的。 事件循环 不像其余网络库,gevent和eventlet相似, 在一个greenlet中隐式开始事件循环。没有必须调用run()或dispatch()的反应器(reactor),在twisted中是有 reactor的。当gevent的API函数想阻塞时,它得到Hub实例(执行时间循环的greenlet),并切换过去。若是没有集线器实例则会动态 建立。 libev提供的事件循环默认使用系统最快轮询机制,设置LIBEV_FLAGS环境变量可指定轮询机制。LIBEV_FLAGS=1为select, LIBEV_FLAGS = 2为poll, LIBEV_FLAGS = 4为epoll,LIBEV_FLAGS = 8为kqueue。 Libev的API位于gevent.core下。注意libev API的回调在Hub的greenlet运行,所以使用同步greenlet的API。可使用spawn()和Event.set()等异步API。
eventlet 是基于 greenlet 实现的面向网络应用的并发处理框架,提供“线程”池、队列等与其余 Python 线程、进程模型很是类似的 api,而且提供了对 Python 发行版自带库及其余模块的超轻量并发适应性调整方法,比直接使用 greenlet 要方便得多。
其基本原理是调整 Python 的 socket 调用,当发生阻塞时则切换到其余 greenlet 执行,这样来保证资源的有效利用。须要注意的是:
eventlet 提供的函数只能对 Python 代码中的 socket 调用进行处理,而不能对模块的 C 语言部分的 socket 调用进行修改。对后者这类模块,仍然须要把调用模块的代码封装在 Python 标准线程调用中,以后利用 eventlet 提供的适配器实现 eventlet 与标准线程之间的协做。
虽然 eventlet 把 api 封装成了很是相似标准线程库的形式,但二者的实际并发执行流程仍然有明显区别。在没有出现 I/O 阻塞时,除非显式声明,不然当前正在执行的 eventlet 永远不会把 cpu 交给其余的 eventlet,而标准线程则是不管是否出现阻塞,老是由全部线程一块儿争夺运行资源。全部 eventlet 对 I/O 阻塞无关的大运算量耗时操做基本没有什么帮助。
协程的好处:
无需线程上下文切换的开销
无需原子操做锁定及同步的开销
方便切换控制流,简化编程模型
高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。因此很适合用于高并发处理。
缺点:
没法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程须要和进程配合才能运行在多CPU上.固然咱们平常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
进行阻塞(Blocking)操做(如IO时)会阻塞掉整个程序
同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不一样的人给出的答案均可能不一样,好比wiki,就认为asynchronous IO和non-blocking IO是一个东西。这实际上是由于不一样的人的知识背景不一样,而且在讨论这个问题的时候上下文(context)也不相同。因此,为了更好的回答这个问题,先限定一下本文的上下文。
本文讨论的背景是Linux环境下的network IO。
Stevens在文章中一共比较了五种IO Model:
因为signal driven IO在实际中并不经常使用,因此我这只说起剩下的四种IO Model。
再说一下IO发生时涉及的对象和步骤。
对于一个network IO (这里咱们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另外一个就是系统内核(kernel)。当一个read操做发生时,它会经历两个阶段:
记住这两点很重要,由于这些IO Model的区别就是在两个阶段上各有不一样的状况。
在linux中,默认状况下全部的socket都是blocking,一个典型的读操做流程大概是这样:
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来讲,不少时候数据在一开始尚未到达(好比,尚未收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,而后kernel返回结果,用户进程才解除block的状态,从新运行起来。
因此,blocking IO的特色就是在IO执行的两个阶段都被block了。
linux下,能够经过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操做时,流程是这个样子:
从图中能够看出,当用户进程发出read操做时,若是kernel中的数据尚未准备好,那么它并不会block用户进程,而是马上返回一个error。从用户进程角度讲 ,它发起一个read操做后,并不须要等待,而是立刻就获得了一个结果。用户进程判断结果是一个error时,它就知道数据尚未准备好,因而它能够再次发送read操做。一旦kernel中的数据准备好了,而且又再次收到了用户进程的system call,那么它立刻就将数据拷贝到了用户内存,而后返回。因此,用户进程实际上是须要不断的主动询问kernel数据好了没有。
注意:
在网络IO时候,非阻塞IO也会进行recvform系统调用,检查数据是否准备好,与阻塞IO不同,”非阻塞将大的整片时间的阻塞分红N多的小的阻塞, 因此进程不断地有机会 ‘被’ CPU光顾”。即每次recvform系统调用之间,cpu的权限还在进程手中,这段时间是能够作其余事情的,
也就是说非阻塞的recvform系统调用调用以后,进程并无被阻塞,内核立刻返回给进程,若是数据还没准备好,此时会返回一个error。进程在返回以后,能够干点别的事情,而后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程一般被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。须要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
1 import time 2 import socket 3 sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 4 sk.setsockopt 5 sk.bind(('127.0.0.1',6667)) 6 sk.listen(5) 7 sk.setblocking(False) 8 while True: 9 try: 10 print ('waiting client connection .......') 11 connection,address = sk.accept() # 进程主动轮询 12 print("+++",address) 13 client_messge = connection.recv(1024) 14 print(str(client_messge,'utf8')) 15 connection.close() 16 except Exception as e: 17 print (e) 18 time.sleep(4) 19 20 #############################client 21 22 import time 23 import socket 24 sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 25 26 while True: 27 sk.connect(('127.0.0.1',6667)) 28 print("hello") 29 sk.sendall(bytes("hello","utf8")) 30 time.sleep(2) 31 break
优势:可以在等待任务完成的时间里干其余活了(包括提交其余任务,也就是 “后台” 能够有多个任务在同时执行)。
缺点:任务完成的响应延迟增大了,由于每过一段时间才去轮询一次read操做,而任务可能在两次轮询之间的任意时间完成。这会致使总体数据吞吐量的下降。
IO multiplexing这个词可能有点陌生,可是若是我说select,epoll,大概就都能明白了。有些地方也称这种IO方式为event driven IO。咱们都知道,select/epoll的好处就在于单个process就能够同时处理多个网络链接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的全部socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”全部select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操做,将数据从kernel拷贝到用户进程。
这个图和blocking IO的图其实并无太大的不一样,事实上,还更差一些。由于这里须要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。可是,用select的优点在于它能够同时处理多个connection。(多说一句。因此,若是处理的链接数不是很高的话,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优点并非对于单个链接能处理得更快,而是在于能处理更多的链接。)
在IO multiplexing Model中,实际中,对于每个socket,通常都设置成为non-blocking,可是,如上图所示,整个用户的process实际上是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
结论: select的优点在于能够处理多个链接,不适用于单个链接
1 #***********************server.py 2 3 import socket 4 import select 5 sk=socket.socket() 6 sk.bind(("127.0.0.1",8800)) 7 sk.listen(5) 8 sk.setblocking(False) 9 inputs=[sk,] 10 11 while True: 12 r,w,e=select.select(inputs,[],[],5) 13 print(len(r)) 14 15 for obj in r: 16 if obj==sk: 17 conn,add=obj.accept() 18 print("conn:",conn) 19 inputs.append(conn) 20 else: 21 22 data_byte=obj.recv(1024) 23 print(str(data_byte,'utf8')) 24 if not data_byte: 25 inputs.remove(obj) 26 continue 27 inp=input('回答%s: >>>'%inputs.index(obj)) 28 obj.sendall(bytes(inp,'utf8')) 29 30 print('>>',r) 31 32 33 #***********************client.py 34 35 import socket 36 sk=socket.socket() 37 sk.connect(('127.0.0.1',8802)) 38 39 while True: 40 inp=input(">>>>") # how much one night? 41 sk.sendall(bytes(inp,"utf8")) 42 data=sk.recv(1024) 43 print(str(data,'utf8'))
select监听fd变化的过程
用户进程建立socket对象,拷贝监听的fd到内核空间,每个fd会对应一张系统文件表,内核空间的fd响应到数据后,就会发送信号给用户进程数据已到;用户进程再发送系统调用,好比(accept)将内核空间的数据copy到用户空间,同时做为接受数据端内核空间的数据清除,这样从新监听时fd再有新的数据又能够响应到了(发送端由于基于TCP协议因此须要收到应答后才会清除)。
linux下的asynchronous IO其实用得不多。先看一下它的流程:
用户进程发起read操做以后,马上就能够开始去作其它的事。而另外一方面,从kernel的角度,当它受到一个asynchronous read以后,首先它会马上返回,因此不会对用户进程产生任何block。而后,kernel会等待数据准备完成,而后将数据拷贝到用户内存,当这一切都完成以后,kernel会给用户进程发送一个signal,告诉它read操做完成了。
到目前为止,已经将四个IO Model都介绍完了。如今回过头来回答最初的那几个问题:blocking和non-blocking的区别在哪,synchronous IO和asynchronous IO的区别在哪。
先回答最简单的这个:blocking vs non-blocking。前面的介绍中其实已经很明确的说明了这二者的区别。调用blocking IO会一直block住对应的进程直到操做完成,而non-blocking IO在kernel还准备数据的状况下会马上返回。
在说明synchronous IO和asynchronous IO的区别以前,须要先给出二者的定义。Stevens给出的定义(实际上是POSIX的定义)是这样子的:
A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked;
二者的区别就在于synchronous IO作”IO operation”的时候会将process阻塞。按照这个定义,以前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。有人可能会说,non-blocking IO并无被block啊。这里有个很是“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操做,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,若是kernel的数据没有准备好,这时候不会block进程。可是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。而asynchronous IO则不同,当进程发起IO 操做以后,就直接返回不再理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程当中,进程彻底没有被block。
各个IO Model的比较如图所示:
通过上面的介绍,会发现non-blocking IO和asynchronous IO的区别仍是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,可是它仍然要求进程去主动的check,而且当数据准备完成之后,也须要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则彻底不一样。它就像是用户进程将整个IO操做交给了他人(kernel)完成,而后他人作完后发信号通知。在此期间,用户进程不须要去检查IO操做的状态,也不须要主动的去拷贝数据。
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): data = conn.recv(1000) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 1234)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask)