032_进程2

守护进程html

  1)正常的子进程没有执行完的时候主进程要一直等着python

  2)守护进程的进程的做用:
           守护进程会随着主进程的代码执行结束(此时主进程不必定结束)而结束。
  3)守护进程是否结束的判断标准是 :
           主进程的代码是否执行完。不是主进程是否结束。
  4)主进程的代码执行完后会要等着全部子进程结束以后结束。
  5)守护进程不会等待其它子进程结束,和子进程无关。但若是想让守护进程等待某个子程序结束后再结束,能够在主程序后使用p.join()设置阻塞,即主程序会等待该子进程,而等待这个代码是主进程的代码,即主进程代码没有执行完,因此守护进程不会结束。
  6)守护进程 要在start以前设置
  7)守护进程中 不能再开启子进程编程

1,开一个守护进程json

  1)父进程作本身的事,开一个报时器子进程,每过一秒通报一次。可是父进程的代码执行完后子进程并不会结束。
  2)设置守护进程后,父进程的代码执行完后,子进程也会结束。数组

import time
from multiprocessing import Process

def cal_time():        # 定义一个报时器
    while True:
        time.sleep(1)
        print('过去了1秒')

if __name__ == '__main__':
    p = Process(target=cal_time)
# 必定在开启进程以前设置 # 将接下来开启的一个进程设置成守护进程
#    p.daemon = True     
    p.start()
    for i in range(100):    # 10s
        time.sleep(0.1)
        print('*'*i)

2,使守护进程等待一个子进程结束以后在结束安全

import time
from multiprocessing import Process
def func():
    print('--'*10)
    time.sleep(15)
    print('--'*10)

def cal_time():
    while True:
        time.sleep(1)
        print('过去了1秒')

if __name__ == '__main__':
    p = Process(target=cal_time)
    p.daemon = True     # 必定在开启进程以前设置
    p.start()
    p2 = Process(target=func)  # 15s
    p2.start()
    for i in range(100):    # 10s
        time.sleep(0.1)
        print('*'*i)
    p2.join()       # 使守护进程等待一个子进程结束以后在结束
# 此处阻塞,子进程p2不结束主程序代码就不会执行完

process 中的部分方法网络

3,process 中的方法  p.is_alive()  p.terminate()并发

  # p.is_alive()   # 是否活着 True表明进程还在 False表明进程不在了
  # p.terminate()  # 结束一个进程,可是这个进程不会马上被杀;强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁app

import time
from multiprocessing import Process
def func():
    print('wahaha')
    time.sleep(5)
    print('qqxing')
if __name__ == '__main__':
    p = Process(target=func)
    p.start()
    print(p.is_alive())     # True
    time.sleep(0.1)
    p.terminate()           # 给操做系统发出结束进程的请求,# 是个异步操做,同步的会等在这。
    print(p.is_alive())     # True   #不会当即结束进程,会有一点延迟。
    time.sleep(1)
    print(p.is_alive())     # False  # 进程结束。
# 结果:
# True
# wahaha
# True
# False

process 中的部分属性dom

 4,process 中的属性

  # pid   查看这个进程 进程id
  # name  查看这个进程的名字

def func():
    print('wahaha')
    time.sleep(5)
    print('qqxing')
if __name__ == '__main__':
    p = Process(target=func)
    p.start()
    print(p.name,p.pid) 
    p.name = '哇哈哈哈'    # 改进程的名字
    print(p.name)

5,如何能在子进程里查看该子进程的name,pid。

class MyProcess(Process):
    def run(self):
        print('wahaha',self.name,self.pid)
        time.sleep(5)
        print('qqxing',self.name,self.pid)
if __name__ == '__main__':
    p = MyProcess()
    p.start()
    print(p.pid)

 

1)为何须要锁?
        由于实现了异步进程,各进程互不干扰,当多个进程同时访问同一个文件时会出现问题。例如,文件中记录有10张票,(多我的)多个进程同时访问,拿到的记录是同样的,每一个进程拿到的都是10,每一个进程都执行的是10-1,而后有都写入文件,文件记录为9张票。事实上,咱们知道,不止卖了一张票。为防止此情况的出现,使用了锁的方式。


2)什么是锁?
        一个文件设置一个锁,当一个进程访问文件时,其它进程就不能进入,当这个进程进行完该文件的操做。其它进程才能访问。
       至关于,一个上锁的房间,门外挂着一把钥匙,要访问的进程都去强这把钥匙,抢到的进程拿着钥匙进入房间并锁上房间,处理完出来后在把钥匙还挂在门外,其它进程在抢。


3)锁 就是在并发编程中 保证数据安全

7,建立锁

from multiprocessing import Lock
lock = Lock()   # 建立锁对象
lock.acquire()  # 锁   拿钥匙
lock.acquire()  # 锁   阻塞   当第一个拿钥匙的释放了钥匙才能拿钥匙

lock.release()  # 释放锁  还钥匙

 7.1,多进程 实现 并发

import json 
import time
import random
from multiprocessing import Lock
from multiprocessing import Process

def search(i):
    with open('ticket') as f:
        print(i,json.load(f)['count'])

def get(i):
    with open('ticket') as f:
        ticket_num = json.load(f)['count']
    time.sleep(random.random())
    if ticket_num > 0:
        with open('ticket','w') as f:
            json.dump({'count':ticket_num-1},f)
        print('%s买到票了'%i)
    else:
        print('%s没票了'%i)

def task(i,lock):
    search(i)   # 查看票
    lock.acquire()
    get(i)      # 抢票
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(20):  # 20我的同时抢票
        p = Process(target=task,args=(i,lock))
        p.start()

信号量

  就是一个上锁的文件,外面挂了设定数量的钥匙,即房间最多存在设定数量的人数。

from multiprocessing import Semaphore
sem = Semaphore(4)      # 设置为四个
sem.acquire()  # 须要钥匙
print(0)
sem.acquire()  # 须要钥匙
print(1)
sem.acquire()  # 须要钥匙
print(2)
sem.acquire()  # 须要钥匙
print(3)
sem.release()
sem.acquire()  # 须要钥匙
print(4)

8, 实例

import time
import random
from multiprocessing import Semaphore
from multiprocessing import Process
def sing(i,sem):
    sem.acquire()
    print('%s : 进入 ktv'%i)
    time.sleep(random.randint(1,10))
    print('%s : 出 ktv'%i)
    sem.release()
# 迷你唱吧  20我的,同一时间只能有4我的进去唱歌
if __name__ == '__main__':
    sem = Semaphore(4)
    for i in range(20):
        Process(target=sing,args=(i,sem)).start()

 

事件

 

9,

# recv    accept    input    sleep  同步阻塞
# lock锁   是异步阻塞
# 事件——异步阻塞
# 事件是一个通知,标志,能够同时使全部进程 都陷入阻塞

from multiprocessing import Event   #事件
e = Event()  # 实例化一个事件  标志       /交通讯号灯
e.wait()      # 刚实例化出来的一个事件对象,阻塞信号      /是红灯
                  # 执行到wait,要先看灯,绿灯行红灯停,若是在停的过程当中灯绿了,就变成非阻塞了
e.set()        # 将标志变成非阻塞        /交通灯变绿
e.clear()     # 将标志又变成阻塞      /交通灯变红

e.is_set()    # 判断是否阻塞     True就是绿灯 False就是红灯

10,实现一个红绿灯处随机经过的车辆,每一个数表明一个车辆(进程)

import time
import random
for i in range(100):       # 一百辆车
    if i%random.randint(3,6) == 0 :      #随机生成一个数,用车号除以生成数。
        time.sleep(random.randint(1,3))  #知足条件的车,等待随机几秒在经过
        print('发生等待:',end=' ')                              # 标识
    print('%s号车经过'%i) 

11,实例:红绿灯

import time
import random
from multiprocessing import Process
from multiprocessing import Event
def traffic_light(e):
    while True:
        if e.is_set():
            time.sleep(3)
            print('红灯亮')
            e.clear()      # 绿变红
        else:
            time.sleep(3)
            print('绿灯亮')
            e.set()        # 红变绿

def car(i,e):
    e.wait()  
    print('%s车经过'%i)

if __name__ == '__main__':
    e = Event()   # 立一个红灯
    tra = Process(target=traffic_light,args=(e,))
    tra.start()   # 启动一个进程来控制红绿灯
    for i in range(100):
        if i%6 == 0 :
            time.sleep(random.randint(1,3))
        car_pro = Process(target=car, args=(i,e))
        car_pro.start()

 

进程间通讯——队列

12,队列

  1.进程之间通讯 可使用multiprocessing 的 Queue模块
  2.队列有两种建立方式 第一种不传参数 这个队列就没有长度限制 ;传参数,建立一个有最大长度限制的队列
  3.提供两个重要方法;put()    get()
  4.qsize  可能出错

from multiprocessing import Queue
q = Queue(3)    # 能够设置长度
q.put(1)
q.put(2)
q.put(3)
q.put(4)

print(q.get())队列
print(q.get())
print(q.get())
print(q.get())  # 若是队列里已经没有值了 就会阻塞等待有一个值

13,进程间通讯

  一个进程往队列里放数据,一个进程能拿到队列里的数据

from multiprocessing import Process
from multiprocessing import Queue

def q_put(q):
    q.put('hello')

def q_get(q):
    print(q.get())

if __name__ =='__main__':
    q = Queue()
    p = Process(target=q_put,args=(q,))
    p.start()
    p1 = Process(target=q_get, args=(q,))
    p1.start()

14, 生产者消费者模型

  我要生产一个数据 而后 给一个函数 让这个函数依赖这个数据进行运算  拿到结果  —— 同步过程

# 但生产者生成数据很快,而消费者处理数据很慢,可使用多个相同的进程处理生产的数据。

# 作包子 和 吃包子

import time
def producer(q):  # 生产者
    for i in  range(100):    # 生产一百个包子
        q.put('包子%s'%i)

def consumer(q): #  消费者
    for i in range(100):
        time.sleep(1)
        print(q.get())

if __name__ == '__main__':
    q = Queue(10)   # 托盘  # 限制队列长度十,当队列满了,只有用一个才能在放一个新的。
		# 防止占用内存
    p = Process(target=producer,args=(q,))
    p.start()
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.start()      #用两个消费者进程,处理数据。
    c2.start()

15,解决 数据的供需不平衡

  # 同步 生产数据 使用数据
  # 异步 主进程 生产数据 子进程使用数据   —— 问题:500个子进程去处理数据 生产过慢
  # 异步 子进程 生产数据 子进程处理数据   —— 问题:不知道生产慢仍是处理慢

  15.1,

# 3个子进程 生产包子
# 2个子进程 吃包子
# 生产的快 吃的慢  包子溢出
# 生产的慢 吃得快  包子不够吃
# 若是增长生产者 —— 让生产变快
# 减小生产者 —— 找一个容器放 约束容器的容量

import time
import random
from multiprocessing import Queue
from multiprocessing import Process
def producer(q,food):
    for i in range(5):
        q.put('%s-%s'%(food,i))
        print('生产了%s'%food)
        # time.sleep(random.randint(1,3))
        time.sleep(random.random())
    q.put(None)   # 有几个消费者进程,就要放入几个
    q.put(None)
    q.put(None)

def consumer(q,name):
    while True:
        food = q.get()   # 生产者不生产仍是生产的慢
        if food == None:break
        print('%s 吃了 %s'%(name,food))

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=(q,'包子'))
    p1.start()
    p2 = Process(target=producer, args=(q, '骨头'))
    p2.start()
    c1 = Process(target=consumer, args=(q, 'alex'))
    c1.start()
    c2 = Process(target=consumer, args=(q, 'egon'))
    c2.start()
    c3 = Process(target=consumer, args=(q, 'jin'))
    c3.start()

# 队列不会乱,由于一次出一个,先进先出。
# 如今 经过queue
# 生产者消费者模型
    #1.消费者要处理多少数据是不肯定的
    #2.因此只能用while循环来处理数据 ,可是while循环没法结束
    #3.须要生产者发送信号
    #4.有多少个消费者 就须要发送多少个信号
    #5.可是发送的信号数量须要根据 生产者和消费者的数量进行计算,因此很是不方便

16,JoinableQueue   解决上面问题

  # 使用上面的方式建立的队列对象,有join和task_done两个方法。
  # q.task_done() 代表完成队列中一个数据的处理。放到消费者代码后,当消费者处理完这个数据后执行这个代码。
  #  q.join() 放到生产者代码后,与上一个方法搭配使用,检查到队列中的全部数据都处理完了(taskdone)不在阻塞。生产者代码结束。
  # 在主进程后加上每一个生产者进程的阻塞,消费者没处理完数据,生产者就不会结束,主进程中设置的生产者阻塞就不经过,即主进程代码不会执行完。
  # 将每一个消费者进程设置成 守护进程,这样当主进程代码执行完,即生产者已经生产了全部数据并处理完毕。守护进程结束,即消费者进程结束。

import time
import random
from multiprocessing import Process
from multiprocessing import JoinableQueue

# def producer(q,food):
#     for i in range(5):
#         q.put('%s-%s'%(food,i))
#         print('生产了%s'%food)
#         # time.sleep(random.randint(1,3))
#         time.sleep(random.random())
#     q.join()  # 等待 消费者 把全部的数据都处理完
#
# def consumer(q,name):
#     while True:
#         food = q.get()   
#         print('%s 吃了 %s'%(name,food))
#         q.task_done()   # 这个数据处理完了
#
# if __name__ == '__main__':
#     q = JoinableQueue()
#     p1 = Process(target=producer,args=(q,'泔水'))
#     p1.start()
#     p2 = Process(target=producer, args=(q, '骨头'))
#     p2.start()
#     c1 = Process(target=consumer, args=(q, 'alex'))
#     c1.daemon = True    # 将消费者设置成守护进程
#     c1.start()
#     c2 = Process(target=consumer, args=(q, 'egon'))
#     c2.daemon = True
#     c2.start()
#     c3 = Process(target=consumer, args=(q, 'jin'))
#     c3.daemon = True
#     c3.start()
#
#     p1.join()  # 等待p1执行完毕
#     p2.join()  # 等待p2执行完毕
#
# # 生产者生产的数据所有被消费 —— 生产者进程结束 —— 主进程代码执行结束 —— 消费者守护进程结束

 

管道 pipe

17,pipe与两个重要方法 send()  recv()

from multiprocessing import Process
from multiprocessing import Pipe
p1,p2 = Pipe()  #支持双向通讯
p1.send('hello')
print(p2.recv())
p2.send('hi')
# p2.close()     # p2端关闭
print(p1.recv())
print(p1.recv())  # 若是p2端没有关闭,会发生阻塞 
        # p2端关闭,管道内没有数据可接收时,报错 EOFError

18,进程中用

from multiprocessing import Process
from multiprocessing import Pipe
def func(son)
    print(son.recv())  # 接收到
    print(son,recv())  #接收不到报错

if __name__ == '__main__':
    foo,son = Pipe()
    p = Process(target=func,args=(son,))
    p.start()
    foo.send('hello')
    foo.close()

19,下面的状况关闭后没有报错而是阻塞

from multiprocessing import Process
from multiprocessing import Pipe
def func(p)
    foo,son = p   # 此处形成了一个管道两端各有两个口
#     foo.close()     # 关闭子进程的foo,recv没有接收到会报错。
	            # 可是子进程foo的关闭主进程没关闭,还会阻塞。
    print(son.recv())  # 接收到
    print(son,recv())  # 阻塞

if __name__ == '__main__':
    foo,son = Pipe()
    p = Process(target=func,args=((foo,son),)) 
    p.start()
    son.close()
    foo.send('hello')
    foo.close()    # 关闭的是主进程的foo ,子进程的foo没有关闭。

  # 因此主进程要及时关闭son ,子进程要关闭不用的foo
  # 管道不安全,若是向上面同样,一个管道有多个发送端口有多个接收端口,当管道中有一个数据时,若是,两个接收端同时去拿这各数据,会形成这两个接收端都拿到了这个数据。

20,用管道也能实现生产者消费者模型

  管道+锁

def func(p,l): foo, son = p foo.close() while True: try : l.acquire() print(son.recv())  # EOFError
 l.release() except EOFError: l.release() son.close() break

def func2(p): foo, son = p son.close() for i in range(10): foo.send(i) foo.close() if __name__ == '__main__': foo,son = Pipe() l = Lock() p = Process(target=func,args=((foo,son),l)) p1 = Process(target=func,args=((foo,son),l)) p2 = Process(target=func,args=((foo,son),l)) p.start() p1.start() p2.start() p3 = Process(target=func2, args=((foo, son),)).start() p4 = Process(target=func2, args=((foo, son),)).start() p5 = Process(target=func2, args=((foo, son),)).start() p6 = Process(target=func2, args=((foo, son),)).start() p7 = Process(target=func2, args=((foo, son),)).start() son.close() foo.close()
管道实现消费者模型

21,管道或队列的先择

  管道:多个机器多个进程间通讯

  

  队列:用在同一台机器的多个进程之间通讯

 

Manager模块

22,Manager 模块

# Pipe 管道 :  双向通讯   数据不安全
# Queue 管道+锁:  双向通讯 数据安全
# JoinableQueue :数据安全
   # put 和 get的一个计数机制 ,每次get数据以后发送task_done,put端接收到计数-1,直到计数为0就能感知到
# Manager是一个类 就提供了能够进行数据共享的一个机制 提供了不少数据类型 dict list

if __name__ == '__main__':
    m = Manager()
    d = m.dict()          # 建立的字典进程间能共享的字典
    print(d)

  22.1,例

# Manager : dict list pipe ,并不提供数据安全的支持
# def func(dic):
#     print(dic)
#     # while True:
#     #     print(dic)
#     #     time.sleep(3)
# if __name__ == '__main__':
#     m = Manager()
#     d = m.dict({'count':0})
#     print(d)
#     # print(d)
#     # d['count'] = 0
#     # print(d)
#     # d = {}
#     p = Process(target=func,args=(d,))
#     p.start()
    # d['count'] = 0

  22.2,例

# from multiprocessing import Manager,Process,Lock
# def work(d,lock):
#     lock.acquire()
#     d['count'] -= 1
#     lock.release()
#
# if __name__ == '__main__':
#         lock= Lock()
#         m = Manager()
#         dic=m.dict({'count':100})   # 共享的数据
#         l = []
#         for i in range(100):
#             p=Process(target=work,args=(dic,lock))
#             p.start()
#             l.append(p)
#         [p.join() for p in l]
#         print(dic)

  

进程池

# 一个电脑四个核,无论开几个进程一次最多同时处理四个进程。
# 有100 个任务,要处理,造一个池子, 放四个进程(或五个不限,但多了没啥用)用来完成任务。
# 开启进程关进程,也须要时间,开的进程 多了花费时间就多
# 开的进程再多,一次最多处理四个进程。而进度池,是开四个或五个进程,反复使用完成任务。

概念介绍:

 

Pool([numprocess  [,initializer [, initargs]]]):建立进程池

 

  参数:

numprocess:要建立的进程数,若是省略,将默认使用cpu_count()的值
initializer:是每一个工做进程启动时要执行的可调用对象,默认为None
initargs:是要传给initializer的参数组

  主要方法:

1) p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。

   '''须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()'''
2) p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。
   '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。'''   
3) p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
4) P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用

  其它方法

方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具备如下方法
1) obj.get():返回结果,若是有必要则等待结果到达。timeout是可选的。若是在指定时间内尚未到达,将引起一场。若是远程操做中引起了异常,它将在调用此方法时再次被引起。
2) obj.ready():若是调用完成,返回True
3) obj.successful():若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常
4) obj.wait([timeout]):等待结果变为可用。
5) obj.terminate():当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是p被垃圾回收,将自动调用此函数

 

23,建立进程池

  23.1,

import os
import time
from multiprocessing import Pool
def func(i):
    time.sleep(1)     # 结果是五个五个的出来,即同时作五个任务。
    print(i,os.getpid())  # 每五个进程的pid是同样的。
# 注意其实输出没顺序,各进程是异步的。

if __name__ == '__main__':
    p = Pool(5)          # 建立能容纳5个进程的进程池对象  
    p.map(func,range(20))  # 向池中提交任务

  23.2,

import os
import time
from multiprocessing import Pool
def func(i):
    time.sleep(1)     # 结果是五个五个的出来,即同时作五个任务。
    print(i,os.getpid())  # 每五个进程的pid是同样的。
# 注意其实输出没顺序,各进程是异步的。

if __name__ == '__main__':
    p = Pool(5)          
    p.map(func,range(20))  
    p.close()   # 是不容许再向进程池中添加任务,不是关闭对象。
    p.join()    # 必须close以后在join
    print('====') # 加上上两句,才保证任务处理完以后,在执行这一句。
	           # 由于是异步的,不加上两句,这句可能会在,任务没处理完就执行了。

  23.3,

import os
import time
import random
from multiprocessing import Pool
from multiprocessing import Process
def func(i):
    i += 1

if __name__ == '__main__':
# 设置进度池,完成一百个任务(用时短,占内存小)
# 只开可5个进程
    p = Pool(5)          # 建立了5个进程
    start = time.time()
    p.map(func,range(1000))   # target = func  args=next(iterable) 必须是可迭代的 
		              #要传多个用元组 [(1,2,3),1,2,3,4]  
    p.close()  
    p.join()
    print(time.time() - start)
    start = time.time()

#  开一百个进程,完成一百个任务(用时长,占内存多)
#  开了一百个进程
    l = []
    for i in range(1000):
        p = Process(target=func,args=(i,))  # 建立了一百个进程
        p.start()
        l.append(p)
    [i.join() for i in l]    # 等待全部进程结束
    print(time.time() - start)

24,进程池的  apply  方法

 

import time
from multiprocessing import Pool

def func(i):
    time.sleep(1)
    i += 1

if __name__ == '__main__':
    p = Pool(5)
    for i in range(20):
        # p.apply(func,args=(i,))     # apply是同步提交任务的机制,执行完一个任务才提交下一个任务。多进程等于没开。
        p.apply_async(func,args=(i,))  # apply_async是异步提交任务的机制
    p.close()
    p.join()

 

回调函数

25,回调函数

  25.1,回调函数的使用

import os
from multiprocessing import Pool
def func(i):    # 多进程中的io多,
    print('子进程%s:%s'%(i,os.getpid()))   # 子进程pid
    return i*'*'

def call(arg):   # 回调函数是在主进程中完成的,不能传参数,只能接受多进程中函数的返回值
    print('回调 :',os.getpid())       # 回调函数pid
    print(arg)

if __name__ == '__main__':
    print('---->',os.getpid())    # 主进程pid
    p = Pool(5)
    for i in range(10):
        p.apply_async(func,args=(i,),callback=call)   
	# callback设置回调函数,即执行完func后返回的值不会再返回到调用它的地方,而是放回给回调函数作参数。
    p.close()
    p.join()

  25.2,例子1

# 请求网页
    # 网络延时 IO操做
# 单进程
    # 10个页面  同时访问多个   —> 用多进程
    # 分析页面:——>用回调函数

# from urllib.request import urlopen       # 这个模块须要安装
# import requests
# ret = requests.get('http://www.baidu.com')
# print(ret.text)   # 整个网页的源代码
# print(ret.status_code)  # 访问网页的状态码

  25.2,例子2

from urllib.request import urlopen
import requests
from multiprocessing import Pool

def get_url(url):
    ret = requests.get(url)
    return {'url':url,
            'status_code':ret.status_code,
            'content':ret.text}


def parser(dic):
    print(dic['url'],dic['status_code'],len(dic['content']))
    # 把分析结果写到文件里

if __name__ == '__main__':
    url_l = [
        'http://www.baidu.com',
        'http://www.sogou.com',
        'http://www.hao123.com',
        'http://www.yangxiaoer.cc',
        'http://www.python.org'
    ]
    p = Pool(4)
    for url in url_l:
        p.apply_async(get_url,args=(url,),callback=parser)
    p.close()
    p.join()

  25.3,爬虫实例

 

import re
from urllib.request import urlopen
from multiprocessing import Pool

def get_page(url,pattern):
    response=urlopen(url).read().decode('utf-8')
    return pattern,response

def parse_page(info):
    pattern,page_content=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0].strip(),
            'title':item[1].strip(),
            'actor':item[2].strip(),
            'time':item[3].strip(),
        }
        print(dic)
if __name__ == '__main__':
    regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
    pattern1=re.compile(regex,re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()
爬虫实例

 

  25.4,若是在主进程中等待进程池中全部任务都执行完毕后,再统一处理结果,则无需回调函数

from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待进程池中全部进程执行完毕

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到全部结果
    print(nums) #主进程拿到全部的处理结果,能够在主进程中进行统一进行处理
无需回调函数

进程池的其余实现方式:https://docs.python.org/dev/library/concurrent.futures.html

相关文章
相关标签/搜索