进程与multiprocessing模块

一 进程 html

  进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操做系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。它是操做系统动态执行的基本单元,在传统的操做系统中,进程既是基本的分配单元,也是基本的执行单元。  ————百度百科python

  PS   os模块的getpid方法就是获取当前进程的进程号(id)。json

  多道技术产生的背景:针对单核,实现并发。windows

  多路复用分为时间上的复用和空间上的复用。缓存

  空间上的复用:将内存分为几个部分,互不干扰。安全

  时间上的复用:网络

         1 遇到I/O阻塞时切换任务。并发

         2 任务执行固定时间后主动切换。app

 

壹:Process类,建立子进程dom

一 建立子进程

  建立子进程的方法一

import multiprocessing
import time import os def foo(): time.sleep(1) print('子进程 %s 父进程 %s' %(os.getpid(),os.getppid())) if __name__ == '__main__': #在windows下必须加上这一句代码 p1=multiprocessing.Process(target=foo) p2=multiprocessing.Process(target=foo) p1.start() p2.start() p1.join() #主进程等待子进程完成,在执行主进程 p2.join() print('主进程 %s 主进程的父进程是 %s,这是pycharm的进程'%(os.getpid(),os.getppid()))

  输出:

子进程 1260 父进程 12808
子进程 2256 父进程 12808 主进程 12808 主进程的父进程是 8804,这是pycharm的进程

  建立子进程的方法二

import multiprocessing
import os class Pro(multiprocessing.Process): def __init__(self,name): super().__init__() self.name=name def run(self): print('进行姓名',self.name) print('子进程 %s 父进程 %s'%(os.getpid(),os.getppid())) if __name__ == '__main__': p=Pro('egon') p.start() print('主进程 %s' %os.getpid())

  输出:

主进程 12632
进行姓名 egon
子进程 10300  父进程 12632

  

  建立子进程方法二的应用

import multiprocessing
import socket
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1', 8080))
server.listen(5)
class Myprocess(multiprocessing.Process):
    def __init__(self,conn):
        super().__init__()
        self.conn = conn
    def run(self):
        while True:
            try:
                while True:
                    date=self.conn.recv(1024)
                    if date==b'q':
                        break
                    self.conn.send(date.upper())
            except  Exception:
                break
if __name__ == '__main__':
    while True:
        conn,addr=server.accept()
        print('conn',conn,'addr',addr)
        s=Myprocess(conn)
        s.start()
server.close()

  在服务端,用multiprocessing模块开启多个子进程时,格式是这样的:

  重要!

import multiprocessing
import socket
class Myprocess(multiprocessing.Process):
    def __init__(self,conn):
        self.conn=conn
    def run(self):
        pass           #核心代码
if __name__ == '__main__':
    server=socket.socket()            #
    server.bind(('127.0.0.1',8080))   #
    server.listen()                   #这三行代码固定不动
    while True:                      
        conn,addr=server.accept()     #其实一个子进程就是一个conn
        p=Myprocess(conn)             #服务端建立多个子进程本应该就是把conn当作参数传给Myprocess
        p.start()                     #生成p对象,p.start()子进程开启,conn有了一个属于本身的子进程。

 

 

 

二 Process类的其余方法

  1  join方法

  官方文档的意思是:阻塞当前进程,直到调用join方法的那个进程执行完,再继续执行当前进程。

import multiprocessing
class Myprocess(multiprocessing.Process):
    def __init__(self,x):
        super().__init__()
        self.x=x
    def run(self):
        print('子进程','----')
if __name__ == '__main__':
    p1=Myprocess(1)
    p1.start()
    print('主进程','====')

  输出:

主进程 ====
子进程 ----

  在加入p1.join()代码以后,p1子进程会先执行完,在执行主进程。

import multiprocessing
class Myprocess(multiprocessing.Process):
    def __init__(self,x):
        super().__init__()
        self.x=x
    def run(self):
        print('子进程','----')
if __name__ == '__main__':
    p1=Myprocess(1)
    p1.start()
    p1.join()
    print('主进程','====')

  输出:

子进程 ----
主进程 ====

  2 daemon() 守护进程

  守护进程(daemon)是一类在后台运行的特殊进程,用于执行特定的系统任务。不少守护进程在系统引导的时候启动,而且一直运行直到系统关闭。

  守护进程会在主进程代码执行完毕后终止。

  守护进程内没法再开启子进程,若是这样作,会报错。

  使用方法  

    p1.daemon=True,放在start()方法以前。

import multiprocessing
import time
class Myprocess(multiprocessing.Process):
    def __init__(self,x):
        super().__init__()
        self.x=x
    def run(self):
        print('子进程{}'.format(self.x),'----')
        time.sleep(2)
        print('子进程{}'.format(self.x),'=====')
if __name__ == '__main__':
    p1=Myprocess(1)
    p2=Myprocess(2)
    p1.daemon=True      #p1是守护进程,主进程代码执行完毕后,立马结束。
    p1.start()
    p2.start()
    time.sleep(1)       #一秒钟,足骨欧p1,p2开启子进程
    print('主进程','====')

  输出:

子进程1 ----
子进程2 ----
主进程 ====
子进程2 =====           #由于p1是守护进程,主进程代码执行完毕后,就立马结束了。因此没有打印‘子进程1 ===’

 

贰:  Lock类,建立互斥锁。只能一次acquire,而后release才能使用。

   Rlock类,建立递归所。解决死锁问题。递归全部个引用计数,能够屡次acquire,release。

  同步可以保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其余线程不能更改;直到该释放资源,将资源的状态变成“非锁定”,其余的线程才能再次锁定该资源。互斥锁保证了每次只有一个进行写入操做,从而保证了多状况下数据的正确性。

  与join方法相似,做用是并发变为串行。

  应用:

  抢票的过程,分为查票的余量和买票。查票的余量应该是并发,买票应该是串行。

import json
import time
import random
import multiprocessing
def search():
    date=json.load(open('db.txt','r'))
    print('票数:{}'.format(date['count']))
def get(i):
    date = json.load(open('db.txt', 'r'))
    if date['count']>0:
        date['count']-=1
        time.sleep(random.randint(1,3))      #模拟网络延迟
        json.dump(date,open('db.txt','w'))
        print('{} 抢票成功!'.format(i))

def rob_ticket(i,lock):         #其实能够没有get,search函数。彻底能够合并在一块儿。可是,分为两个小函数,逻辑很是清晰。加锁也变得更加容易,不出错。
    search()
    lock.acquire()  #加锁                       #with lock:  和文件操做同样,也能够简化。
    get(i)                                      #   get(i)
    lock.release()  #释放锁,解锁


if __name__ == '__main__':
    lock = multiprocessing.Lock()
    print('lock',lock)         #主进程建立了一个互斥锁,做为参数传给子进程。
    for i in range(1,21):      #建立20个子进程
        p=multiprocessing.Process(target=rob_ticket,args=(i,lock))
        p.start()

  输出:

lock <Lock(owner=None)>
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
票数:2
1 抢票成功!
3 抢票成功!

  总结:互斥锁限定了在同一时刻只有一个进程可以对共享资源进行修改。弊端是涉及到文件的修改,文件在硬盘上,效率不可避免的会很低。

     并且还须要本身进行加锁解锁处理。因此,若是能够,尽可能寻求更好的方法。IPC机制即是答案。

 

 

叁:进程间通讯(IPC,Inter-Process Communication)指至少两个进程或线程间传送数据或信号的一些技术或方法。

python中提供了队列(Queue)和管道(Pipe)两种方法。

  1 队列和管道都是将数据存放在内存中,比起硬盘,速度会快不少。

  2 队列是基于 管道+锁 实现的,能够帮咱们从加锁的繁琐代码中解脱出来。推荐使用

 

肆 Queue类。队列。

  Queue([maxsize]), 建立共享的进程队列。maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。底层队列使用管道和锁定实现。

  put方法  put_mowait()方法

  get方法  get_nowait()方法  

import multiprocessing

q=multiprocessing.Queue(3)
print(q,)
q.put('1')
q.put('2')
q.put('3')
print(q.get())
print(q.get())
print(q.get())

  输出:

<multiprocessing.queues.Queue object at 0x000002132885A048>
1
2
3

 

伍  JoinableQueue类

  q=JoinableQueue()

  提供了Queue类两个没有的方法。

    join():阻塞,直到队列q中没有item。  

    task_done():必须跟在get()方法后面。

    

  from multiprocessing import JoinableQueue
  q = JoinableQueue()
  q.task_done() # Signal task completion
  q.join() # Wait for completion。
  
  

JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.

task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks

from multiprocessing import Process,JoinableQueue
import time,random
def consumer(name,q):
    while True:
        time.sleep(random.randint(1,2))
        res=q.get()
        if res is None:break
        print('%s 吃了 %s'%(name,res))
        q.task_done()           #一个get()跟着一个task_done()       q.task_done()是放在消费者模型这边的。
def produce(name,q):
    for i in range(10):
        time.sleep(random.randint(1,2))
        res='包子%s'%i
        q.put(res)
        print('%s 生产了 %s '%(name,res))
    q.join()                   #若是注释掉,最后显示的时间是15秒左右,由于p1,p2代码执行完后,无论队列q中
if __name__ == '__main__':     #有没有item,p1,p2的完成表明着主进程的完成,c又是守护进程。c尽管没有消费完全部数据,也会终结。  q.join()是放在生产者模型这边的。
    start_time=time.time()     #加上join,即是阻塞状态,知道q队列中的item被c进程所有完,这样主进程代码执行完毕。c做为守护进程,也会随之终结。用时大约30秒
    q=JoinableQueue()
    p1=Process(target=produce,args=('egon',q))
    p2=Process(target=produce,args=('wupeoqi',q))
    c=Process(target=consumer,args=('alex',q))
    c.daemon=True
    c.start()
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(time.time()-start_time)

 

陆 Manager() 

  Python中进程间共享数据,处理基本的queue,pipe和value+array外,还提供了更高层次的封装。使用multiprocessing.Manager能够简单地使用这些高级接口。 

  Manager()返回的manager对象控制了一个server进程,此进程包含的python对象能够被其余的进程经过proxies来访问。从而达到多进程间数据通讯且安全。

  Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。 

  Manager()也是建立的内存中的空间。

import time
def foo(d,lock):
    with lock:
        temp=d['x']
        time.sleep(0.001)
        d['x']=temp-1

from multiprocessing import Manager,Process,Lock
if __name__ == '__main__':
    m=Manager()
    lock=Lock()
    d=m.dict({'x':10})
    l=[]
    for i in range(10):
        p=Process(target=foo,args=(d,lock))
        l.append(p)
        p.start()
    for p in l:
        p.join()
    print(d)

 

柒 Pool()         !!!!!http://www.cnblogs.com/Tour/p/4564710.html  !!!很好的博客地址

   在使用Python进行系统管理时,特别是同时操做多个文件目录或者远程控制多台主机,并行操做能够节约大量的时间。若是操做的对象数目不大时,还能够直接使用Process类动态的生成多个进程,十几个还好,可是若是上百个甚至更多,那手动去限制进程数量就显得特别的繁琐,此时进程池就派上用场了。 
Pool类能够提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,若是池尚未满,就会建立一个新的进程来执行请求。若是池满,请求就会告知先等待,直到池中有进程结束,才会建立新的进程来执行这些请求。 
下面介绍一下multiprocessing 模块下的Pool类下的几个方法。

  apply():

    该函数用于传递不定参数,主进程会被阻塞直到函数执行结束,不建议使用。同步调用

  apply_async():

    apply_async(func[, args=()[, kwds={}[, callback=None]]])

    非阻塞且支持结果返回进行回调

  

    首先来看apply_async方法,源码以下:

复制代码
def apply_async(self, func, args=(), kwds={}, callback=None):
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result
func表示执行此任务的方法
args、kwds分别表func的位置参数和关键字参数
callback表示一个单参数的方法,当有结果返回时,callback方法会被调用,参数即为任务执行后的结果
复制代码

    每调用一次apply_result方法,实际上就向_taskqueue中添加了一条任务,注意这里采用了非阻塞(异步)的调用方式,即apply_async方法中新建的任务只是被添加到任务队列中,还并未执行,不须要等待,直接返回建立的ApplyResult对象,注意在建立ApplyResult对象时,将它放入进程池的缓存_cache中。

    任务队列中有了新建立的任务,那么根据上节分析的处理流程,进程池的_task_handler线程,将任务从taskqueue中获取出来,放入_inqueue中,触发worker进程根据args和kwds调用func,运行结束后,将结果放入_outqueue,再由进程池中的_handle_results线程,将运行结果从_outqueue中取出,并找到_cache缓存中的ApplyResult对象,_set其运行结果,等待调用端获取。

  close():

    关闭进程池,使其再也不接收新的任务。

  join():

    主进程阻塞等待子进程的退出,join方法必须用在close()方法以后,二者搭配使用。

  PS 回调函数:

    回调函数就是一个经过函数指针调用的函数。若是你把函数的指针(地址)做为参数传递给另外一个函数,当这个指针被用来调用其所指向的函数时,咱们就说这是回调函数。回调函数不是由该函数的实现方直接调用,而是在特定的事件或条件发生时由另外的一方调用的,用于对该事件或条件进行响应。                ——百度百科

    你到一个商店买东西,恰好你要的东西没有货,因而你在店员那里留下了你的电话,过了几天店里有货了,店员就打了你的电话,而后你接到电话后就到店里去取了货。在这个例子里,你的电话号码就叫回调函数,你把电话留给店员就叫登记回调函数,店里后来有货了叫作触发了回调关联的事件,店员给你打电话叫作调用回调函数,你到店里去取货叫作响应回调事件。  ——知乎回答
    

    callback函数是一个以参数形式传递给另外一个函数的函数,而且该函数(指callback函数)必须等另外一个函数执行完才会被调用!(当被调用时,另外一个函数就是callback函数的父函数)。

    理解起来可能有点绕,通俗点的例子:

    函数a有一个参数,这个参数是个函数b,当函数a执行完之后执行函数b。那么这个过程就叫回调。

    这里必须强调的一点:函数b是你以参数形式传给函数a的,那么函数b被调用时就叫回调函数。


  PS 同步与异步   

    同步和异步关注的是消息通讯机制 (synchronous communication/ asynchronous communication)
  所谓同步,就是在发出一个*调用*时,在没有获得结果以前,该*调用*就不返回。可是一旦调用返回,就获得返回值了。
  换句话说,就是由*调用者*主动等待这个*调用*的结果。

    而异步则是相反,*调用*在发出以后,这个调用就直接返回了,因此没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会马上获得结果。而是在*调用*发出后,*被调用者*经过状态、通知来通知调用者,或经过回调函数处理这个调用。

    同步I/O操做:致使请求进程阻塞,直到I/O操做完成;

    异步I/O操做:不致使请求进程阻塞。

   
from multiprocessing import Process,Pool
import time,os
def foo(n):
    print(n)
    time.sleep(5)
    print('%s is working '%os.getpid())
    return  n**2

if __name__ == '__main__':
    p=Pool(5)
    objs=[]
    for i in range(10):
        obj=p.apply_async(foo,args=(i,))
        objs.append(obj)
    p.close()
    p.join()
    print(objs)
    for obj in objs:
        print(obj.get())

  输出: 

0
1
2
3
4
14344 is working 
5
15624 is working 
6
5312 is working 
7
16000 is working 
8
11868 is working 
9
14344 is working 
15624 is working 
5312 is working 
16000 is working 
11868 is working 
[<multiprocessing.pool.ApplyResult object at 0x000001B9953AF860>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AF908>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AF9B0>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFA90>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFB70>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFC50>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFD30>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFE10>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFEF0>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFFD0>]
0
1
4
9
16
25
36
49
64
81

  注意:看obj是什么,用get()方法取其值。

 

  Pool类的异步以及回调函数。回调函数能够用在爬虫上。

import requests,os
from multiprocessing import Pool,Process
def get(url):
    r=requests.get(url)
    print('进程%s get %s'%(os.getpid(),url))
    return {'url':url,'text':len(r.text)}
def search(dic):
    with open('db.txt','a')as f:             # a 模式 也能够建立不存在的文件名
        date='url: %s lenth: %s\n'%(dic['url'],dic['text'])
        f.write(date)
if __name__ == '__main__':
    p=Pool(3)
    l=[]
    url_l=['http://cn.bing.com/','http://www.cnblogs.com/wupeiqi/','http://www.cnblogs.com/654321cc/',
           'https://www.cnblogs.com/','http://society.people.com.cn/n1/2017/1012/c1008-29581930.html',
           'http://www.xilu.com/news/shaonianxinzangyou5gedong.html',]
    for url in url_l:
        obj=p.apply_async(get,(url,),callback=search)   #在这里,apply_async,建立了进程。search是回调函数,有且惟一参数是get函数的返回值,
        l.append(obj)                                   #obj一直是ApplyResult object
    p.close()
    p.join()
    print(l)
    for obj in l:
        print(obj.get())                                #obj.get()一直是get()函数的返回值,无论有没有回调函数。

  输出:

进程14044 get http://www.cnblogs.com/wupeiqi/
进程13000 get http://www.cnblogs.com/654321cc/
进程15244 get http://cn.bing.com/
进程15244 get http://www.xilu.com/news/shaonianxinzangyou5gedong.html
进程14044 get https://www.cnblogs.com/
进程13000 get http://society.people.com.cn/n1/2017/1012/c1008-29581930.html
[<multiprocessing.pool.ApplyResult object at 0x0000027D4C893BE0>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893C88>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893D30>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893DD8>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893E80>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893F60>]
{'url': 'http://cn.bing.com/', 'text': 127210}
{'url': 'http://www.cnblogs.com/wupeiqi/', 'text': 21292}
{'url': 'http://www.cnblogs.com/654321cc/', 'text': 13268}
{'url': 'https://www.cnblogs.com/', 'text': 40331}
{'url': 'http://society.people.com.cn/n1/2017/1012/c1008-29581930.html', 'text': 23641}
{'url': 'http://www.xilu.com/news/shaonianxinzangyou5gedong.html', 'text': 51247}
相关文章
相关标签/搜索