python-学习-python并发编程之多进程与多线程

一 multiprocessing模块介绍

    python中的多线程没法利用多核优点,若是想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分状况须要使用多进程。Python提供了multiprocessing。
    multiprocessing模块用来开启子进程,并在子进程中执行咱们定制的任务(好比函数),该模块与多线程模块threading的编程接口相似。html

  multiprocessing模块的功能众多:支持子进程、通讯和共享数据、执行不一样形式的同步,提供了Process、Queue、Pipe、Lock等组件。python

    须要再次强调的一点是:与线程不一样,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。数据库

二 Process类的介绍

    建立进程的类编程

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化获得的对象,表示一个子进程中的任务(还没有启动)

强调:
1. 须要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

    参数介绍:windows

复制代码
1 group参数未使用,值始终为None
2 
3 target表示调用对象,即子进程要执行的任务
4 
5 args表示调用对象的位置参数元组,args=(1,2,'egon',)
6 
7 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
8 
9 name为子进程的名称
复制代码

  方法介绍:数组

复制代码
 1 p.start():启动进程,并调用该子进程中的p.run() 
 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法  
 3 
 4 p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁
 5 p.is_alive():若是p仍然运行,返回True
 6 
 7 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程  
复制代码

    属性介绍:安全

复制代码
1 p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置
2 
3 p.name:进程的名称
4 
5 p.pid:进程的pid
6 
7 p.exitcode:进程在运行时为None、若是为–N,表示被信号N结束(了解便可)
8 
9 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络链接的底层进程间通讯提供安全性,这类链接只有在具备相同的身份验证键时才能成功(了解便可)
复制代码

三 Process类的使用

注意:在windows中Process()必须放到# if __name__ == '__main__':下服务器

  详细解释

建立并开启子进程的两种方式网络

  方法一
  方法二

练习1:把上周所学的socket通讯变成并发的形式多线程

  server端
  多个client端
  这么实现有没有问题??? 

Process对象的join方法

  join:主进程等,等待子进程结束
  有了join,程序不就是串行了吗???

Process对象的其余方法或属性(了解)

  terminate与is_alive
  name与pid

四 守护进程

主进程建立守护进程

  其一:守护进程会在主进程代码执行结束后就终止

  其二:守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

  View Code
  迷惑人的例子

五 进程同步(锁)

进程之间数据不共享,可是共享同一套文件系统,因此访问同一个文件,或同一个打印终端,是没有问题的,

竞争带来的结果就是错乱,如何控制,就是加锁处理

part1:多个进程共享同一打印终端

  并发运行,效率高,但竞争同一打印终端,带来了打印错乱
  加锁:由并发变成了串行,牺牲了运行效率,但避免了竞争

part2:多个进程共享同一文件

文件当数据库,模拟抢票

  并发运行,效率高,但竞争写同一文件,数据写入错乱
  加锁:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全

总结:

加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然能够用文件共享数据实现进程间通讯,但问题是:
1.效率低
2.须要本身加锁处理

 

为此mutiprocessing模块为咱们提供了基于消息的IPC通讯机制:队列和管道。
1 队列和管道都是将数据存放于内存中
2 队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来,
咱们应该尽可能避免使用共享数据,尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可获展性。

六 队列(推荐使用)

   进程彼此之间互相隔离,要实现进程间通讯(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

 建立队列的类(底层就是以管道和锁定的方式实现)

1 Queue([maxsize]):建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。 

    参数介绍:

1 maxsize是队列中容许最大项数,省略则无大小限制。    

  方法介绍:

    主要方法:
复制代码
复制代码
1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。若是超时,会抛出Queue.Full异常。若是blocked为False,但该Queue已满,会当即抛出Queue.Full异常。
2 q.get方法能够从队列读取而且删除一个元素。一样,get方法有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。若是blocked为False,有两种状况存在,若是Queue有一个值可用,则当即返回该值,不然,若是队列为空,则当即抛出Queue.Empty异常.
3  
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6 
7 q.empty():调用此方法时q为空则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中又加入了项目。
8 q.full():调用此方法时q已满则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中的项目被取走。
9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()同样
复制代码
复制代码
    其余方法(了解):
1 q.cancel_join_thread():不会在进程退出时自动链接后台线程。能够防止join_thread()方法阻塞
2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但还没有写入的数据,但将在此方法完成时立刻关闭。若是q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,若是某个使用者正在被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误。
3 q.join_thread():链接队列的后台线程。此方法用于在调用q.close()方法以后,等待全部队列项被消耗。默认状况下,此方法由不是q的原始建立者的全部进程调用。调用q.cancel_join_thread方法能够禁止这种行为

  应用:

  View Code

 

    生产者消费者模型

在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度。

    为何要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。

    什么是生产者消费者模式

生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现生产者消费者模型

复制代码
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print('主')
复制代码

 

此时的问题是主进程永远不会结束,缘由是:生产者p在生产完后就结束了,可是消费者c在取空了q以后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就能够break出死循环

  生产者在生产完毕后发送结束信号None

注意:结束信号None,不必定要由生产者发,主进程里一样能够发,但主进程须要等生产者结束后才应该发送该信号

  主进程在生产者生产完毕后发送结束信号None

但上述解决方式,在有多个生产者和多个消费者时,咱们则须要用一个很low的方式去解决

  有几个生产者就须要发送几回结束信号:至关low

 

其实咱们的思路无非是发送结束信号而已,有另一种队列提供了这种机制

复制代码
   #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

   #参数介绍:
    maxsize是队列中容许最大项数,省略则无大小限制。    
  #方法介绍:
    JoinableQueue的实例p除了与Queue对象相同的方法以外还具备:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止
复制代码
  View Code

七 管道

进程间通讯(IPC)方式二:管道(不推荐使用,了解便可)

  介绍
  基于管道实现进程间通讯(与队列的方式是相似的,队列就是管道加锁实现的)

注意:生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。若是忘记执行这些步骤,程序可能再消费者中的recv()操做上挂起。管道是由操做系统进行引用计数的,必须在全部进程中关闭管道后才能生产EOFError异常。所以在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点。

  管道能够用于双向通讯,利用一般在客户端/服务器中使用的请求/响应模型或远程过程调用,就可使用管道编写与进程交互的程序
 

八 共享数据

展望将来,基于消息传递的并发编程是大势所趋

即使是使用线程,推荐作法也是将程序设计为大量独立的线程集合

经过消息队列交换数据。这样极大地减小了对使用锁定和其余同步手段的需求,

还能够扩展到分布式系统中

进程间通讯应该尽可能避免使用本节所讲的共享数据的方式

复制代码
进程间数据是独立的,能够借助于队列或管道实现通讯,两者都是基于消息传递的

虽然进程间数据独立,但能够经过Manager实现数据共享,事实上Manager的功能远不止于此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,
复制代码
  进程之间操做共享的数据

九 信号量(了解)

  信号量Semahpore(同线程同样)

十 事件(了解)

  Event(同线程同样)

十一 进程池

在利用Python进行系统管理的时候,特别是同时操做多个文件目录,或者远程控制多台主机,并行操做能够节约大量的时间。多进程是实现并发的手段之一,须要注意的问题是:

  1. 很明显须要并发执行的任务一般要远大于核数
  2. 一个操做系统不可能无限开启进程,一般有几个核就开几个进程
  3. 进程开启过多,效率反而会降低(开启进程是须要占用系统资源的,并且开启多余核数目的进程也没法作到并行)

例如当被操做对象数目不大时,能够直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但若是是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时能够发挥进程池的功效。

咱们就能够经过维护一个进程池来控制进程数目,好比httpd的进程模式,规定最小进程数和最大进程数... 
ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool能够提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,若是池尚未满,那么就会建立一个新的进程用来执行该请求;但若是池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

    建立进程池的类:若是指定numprocess为3,则进程池会从无到有建立三个进程,而后自始至终使用这三个进程去执行全部任务,不会开启其余进程

1 Pool([numprocess  [,initializer [, initargs]]]):建立进程池 

    参数介绍:

1 numprocess:要建立的进程数,若是省略,将默认使用cpu_count()的值
2 initializer:是每一个工做进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组

  方法介绍:

    主要方法:
复制代码
1 p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()
2 p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。
3    
4 p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
5 P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用
复制代码

   其余方法(了解部分)

  View Code 

     应用:

  apply同步执行:阻塞式
  apply_async异步执行:非阻塞
  详解:apply_async与apply

练习2:使用进程池维护固定数目的进程(重写练习1)

  server端
  客户端

发现:并发开启多个客户端,服务端同一时间只有3个不一样的pid,干掉一个客户端,另一个客户端才会进来,被3个进程之一处理

 

  回掉函数:

须要回调函数的场景:进程池中任何一个任务一旦处理完了,就当即告知主进程:我好了额,你能够处理个人结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

咱们能够把耗时间(阻塞)的任务放到进程池中,而后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

  View Code
  爬虫案例

 

  若是在主进程中等待进程池中全部任务都执行完毕后,再统一处理结果,则无需回调函数

  View Code

python并发编程之多线程理论部分:http://www.cnblogs.com/linhaifeng/articles/7430082.html

1、开启线程的两种方式

  方式一
  方式二

 2、在一个进程下开启多个线程与在一个进程下开启多个子进程的区别

  1 谁的开启速度快
  2 瞅一瞅pid
  3 同一进程内的线程共享该进程的数据?

    练习一:

  多线程并发的socket服务端
  客户端

    练习二:三个任务,一个接收用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件

  View Code

3、守护线程 

不管是进程仍是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁

须要强调的是:运行完毕并不是终止运行

#1.对主进程来讲,运行完毕指的是主进程代码运行完毕

#2.对主线程来讲,运行完毕指的是主线程所在的进程内全部非守护线程通通运行完毕,主线程才算运行完毕

详细解释:

#1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),而后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(不然会产生僵尸进程),才会结束,

#2 主线程在其余非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。由于主线程的结束意味着进程的结束,进程总体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
 1 from threading import Thread
 2 import time
 3 def sayhi(name):
 4     time.sleep(2)
 5     print('%s say hello' %name)
 6 
 7 if __name__ == '__main__':
 8     t=Thread(target=sayhi,args=('egon',))
 9     t.setDaemon(True) #必须在t.start()以前设置
10     t.start()
11 
12     print('主线程')
13     print(t.is_alive())
14     '''
15     主线程
16     True
17     '''
例子
  迷惑人的例子

 4、同步锁

复制代码
三个须要注意的点:
#1.线程抢的是GIL锁,GIL锁至关于执行权限,拿到执行权限后才能拿到互斥锁Lock,其余线程也能够抢到GIL,但若是发现Lock仍然没有被释放则阻塞,即使是拿到执行权限GIL也要马上交出来

#2.join是等待全部,即总体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁均可以实现,毫无疑问,互斥锁的部分串行效率要更高

#3. 必定要看本小节最后的GIL与互斥锁的经典分析    GIL:连接:http://www.cnblogs.com/linhaifeng/articles/7449853.html

复制代码

GIL VS Lock

    Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为何这里还须要lock? 

 首先咱们须要达成共识:锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据

    而后,咱们能够得出结论:保护不一样的数据就应该加不一样的锁。

 最后,问题就很明朗了,GIL 与Lock是两把锁,保护的数据不同,前者是解释器级别的(固然保护的就是解释器级别的数据,好比垃圾回收的数据),后者是保护用户本身开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock

过程分析:全部线程抢的是GIL锁,或者说全部线程抢的是执行权限

  线程1抢到GIL锁,拿到执行权限,开始执行,而后加了一把Lock,尚未执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程当中发现Lock尚未被线程1释放,因而线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,而后正常执行到释放Lock。。。这就致使了串行运行的效果

  既然是串行,那咱们执行

  t1.start()

  t1.join

  t2.start()

  t2.join()

  这也是串行执行啊,为什么还要加Lock呢,需知join是等待t1全部的代码执行完,至关于锁住了t1的全部代码,而Lock只是锁住一部分操做共享数据的代码。

  详细
复制代码
from threading import Thread
import os,time
def work():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1
if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #结果可能为99
复制代码

锁一般被用来实现对共享资源的同步访问。为每个共享资源建立一个Lock对象,当你须要访问该资源时,调用acquire方法来获取锁对象(若是其它线程已经得到了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁:

复制代码
import threading

R=threading.Lock()

R.acquire()
'''
对公共数据的操做
'''
R.release()
复制代码
  View Code
  GIL锁与互斥锁综合分析(重点!!!)
  互斥锁与join的区别(重点!!!)

  1.死锁现象与递归锁

进程也有死锁与递归锁,在进程那里忘记说了,放到这里一切说了额

所谓死锁: 是指两个或两个以上的进程或线程在执行过程当中,因争夺资源而形成的一种互相等待的现象,若无外力做用,它们都将没法推动下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,以下就是死锁

  View Code

解决方法,递归锁,在Python中为了支持在同一线程中屡次请求同一资源,python提供了可重入锁RLock。

这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源能够被屡次require。直到一个线程全部的acquire都被release,其余的线程才能得到资源。上面的例子若是使用RLock代替Lock,则不会发生死锁:

mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的状况,则counter继续加1,这期间全部其余线程都只能等待,等待该线程释放全部锁,即counter递减到0为止

5、定时器

定时器,指定n秒后执行某操做

复制代码
from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed
复制代码

 

6、paramiko模块

1. 介绍:

paramiko是一个用于作远程控制的模块,使用该模块能够对远程服务器进行命令或文件操做,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实。

2. 下载安装

pip3 install paramiko #在python3中
  在python2中

3. 使用

SSHClient

用于链接远程服务器并执行基本命令

基于用户名密码链接:

复制代码
import paramiko

# 建立SSH对象
ssh = paramiko.SSHClient()
# 容许链接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 链接服务器
ssh.connect(hostname='120.92.84.249', port=22, username='root', password='xxx')

# 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
result = stdout.read()
print(result.decode('utf-8'))
# 关闭链接
ssh.close()
复制代码
  SSHClient 封装 Transport

基于公钥密钥链接:

客户端文件名:id_rsa

服务端必须有文件名:authorized_keys(在用ssh-keygen时,必须制做一个authorized_keys,能够用ssh-copy-id来制做)

  View Code
  SSHClient 封装 Transport 
  基于私钥字符串进行链接

SFTPClient

用于链接远程服务器并执行上传下载

基于用户名密码上传下载

  View Code

基于公钥密钥上传下载

  View Code
  Demo 
相关文章
相关标签/搜索