day20-多并发编程基础(一)

从新写一下吧,系统奔溃了,之前写的彻底没了,悲催,今日主要写进程python

1. 进程的理论知识linux

2. python中的进程操做nginx

开始今日份整理,加油,你是最胖的!!!web

 

1. 进程的理论知识redis

1.1 操做系统的背景知识算法

  顾名思义,进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。数据库

  进程的概念起源于操做系统,是操做系统最核心的概念,也是操做系统提供的最古老也是最重要的抽象概念之一。操做系统的其余全部内容都是围绕进程的概念展开的。macos

PS:即便能够利用的cpu只有一个(早期的计算机确实如此),也能保证支持(伪)并发的能力。将一个单独的cpu变成多个虚拟的cpu(多道技术:时间多路复用和空间多路复用+硬件上支持隔离),没有进程的抽象,现代计算机将不复存在。编程

必备的理论基础:json

#一 操做系统的做用:
    1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口
    2:管理、调度进程,而且将多个进程对硬件的竞争变得有序

#二 多道技术:
    1.产生背景:针对单核,实现并发
    ps:
    如今的主机通常是多核,那么每一个核都会利用多道技术
    有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再从新调度,会被调度到4个
    cpu中的任意一个,具体由操做系统调度算法决定。
    
    2.空间上的复用:如内存中同时有多道程序
    3.时间上的复用:复用一个cpu的时间片
       强调:遇到io切,占用cpu时间过长也切,核心在于切以前将进程的状态保存下来,这样
            才能保证下次切换回来时,能基于上次切走的位置继续运行

#三 I/O操做
    #i = input 输入
    从任何其余地方到--->内存
    读文件,json.load,从网络上接收信息
    #o = output 输出
    内存--->放到任何其余地方
    写文件,json.dump,向网络上发送数据 send sendto

1.2 进程的概念以及特性

  进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操做系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。

  • 狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。
  • 广义定义:进程是一个具备必定独立功能的程序关于某个数据集合的一次运行活动。它是操做系统动态执行的基本单元,在传统的操做系统中,进程既是基本的分配单元,也是基本的执行单元。

进程的概念

第一,进程是一个实体。每个进程都有它本身的地址空间,通常状况下,包括文本区域(text region)、数据区域(data region)和堆栈(stack region)。文本区域存储处理器执行的代码;数据区域存储变量和进程执行期间使用的动态分配的内存;堆栈区域存储着活动过程调用的指令和本地变量。
第二,进程是一个“执行中的程序”。程序是一个没有生命的实体,只有处理器赋予程序生命时(操做系统执行之),它才能成为一个活动的实体,咱们称其为进程。[3] 
进程是操做系统中最基本、重要的概念。是多道程序系统出现后,为了刻画系统内部出现的动态状况,描述系统内部各道程序的活动规律引进的一个概念,全部多道程序设计操做系统都创建在进程的基础上。

操做系统引入进程的概念的缘由

从理论角度看,是对正在运行的程序过程的抽象;
从实现角度看,是一种数据结构,目的在于清晰地刻画动态系统的内在规律,有效管理和调度进入计算机系统主存储器运行的程序。

进程的特征

动态性:进程的实质是程序在多道程序系统中的一次执行过程,进程是动态产生,动态消亡的。
并发性:任何进程均可以同其余进程一块儿并发执行
独立性:进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的独立单位;
异步性:因为进程间的相互制约,使进程具备执行的间断性,即进程按各自独立的、不可预知的速度向前推动
结构特征:进程由程序、数据和进程控制块三部分组成。
多个不一样的进程能够包含相同的程序:一个程序在不一样的数据集里就构成不一样的进程,能获得不一样的结果;可是执行过程当中,程序不能发生改变。

进程与程序的区别

程序是指令和数据的有序集合,其自己没有任何运行的含义,是一个静态的概念。
而进程是程序在处理机上的一次执行过程,它是一个动态的概念。
程序能够做为一种软件资料长期存在,而进程是有必定生命期的。
程序是永久的,进程是暂时的。

注意:同一个程序执行两次,就会在操做系统中出现两个进程,因此咱们能够同时运行一个软件,分别作不一样的事情也不会混乱。

1.3 进程的调用方法

要想多个进程交替进行,操做系统必须对这些进程进行调度,这个调度也不是随即进行的,而是须要遵循必定的法则,由此就有了进程的调度算法。主要有

  • 先来先服务调度方法
  • 短做业优先调度方法
  • 时间片轮转法,现代操做系统经常使用
  • 多级反馈队列,多种调度方法的结合体

1.4 进程的并发与并行

并行 : 并行是指二者同时执行,好比赛跑,两我的都在不停的往前跑;(资源够用,好比三个线程,四核的CPU )

并发 : 并发是指资源有限的状况下,二者交替轮流使用资源,好比一段路(单核CPU资源)同时只能过一我的,A走一段后,让给B,B用完继续给A ,交替使用,目的是提升效率。

区别:

并行是从微观上,也就是在一个精确的时间片刻,有不一样的程序在执行,这就要求必须有多个处理器。
并发是从宏观上,在一个时间段上能够看出是同时执行的,好比一个服务器同时处理多个session。

1.5 同步\异步\阻塞\非阻塞

  1.5.1 进程的状态

在了解其余概念以前,咱们首先要了解进程的几个状态。在程序运行的过程当中,因为被操做系统的调度算法控制,程序会进入几个状态:就绪,运行和阻塞。

  (1)就绪(Ready)状态

  当进程已分配到除CPU之外的全部必要的资源,只要得到处理机即可当即执行,这时的进程状态称为就绪状态。

  (2)执行/运行(Running)状态当进程已得到处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。

  (3)阻塞(Blocked)状态正在执行的进程,因为等待某个事件发生而没法执行时,便放弃处理机而处于阻塞状态。引发进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能知足、等待信件(信号)等。

下面这些就是我最蒙圈的一部份内容

  1.5.2 同步与异步

  • 所谓同步就是一个任务的完成须要依赖另一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态能够保持一致。
  • 所谓异步是不须要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工做,依赖的任务也当即执行,只要本身完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务没法肯定,因此它是不可靠的任务序列

  1.5.3 阻塞与非阻塞

  阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来讲的

  1.5.4 同步/异步与阻塞/非阻塞

  1. 同步阻塞形式

  效率最低。拿上面的例子来讲,就是你专心排队,什么别的事都不作。

  1. 异步阻塞形式

  若是在银行等待办理业务的人采用的是异步的方式去等待消息被触发(通知),也就是领了一张小纸条,假如在这段时间里他不能离开银行作其它的事情,那么很显然,这我的被阻塞在了这个等待的操做上面;

  异步操做是能够被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。

  1. 同步非阻塞形式

  其实是效率低下的。

  想象一下你一边打着电话一边还须要抬头看到底队伍排到你了没有,若是把打电话和观察排队的位置当作是程序的两个操做的话,这个程序须要在这两种不一样的行为之间来回的切换,效率可想而知是低下的。

  1. 异步非阻塞形式

  效率更高,

  由于打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不一样的操做中来回切换

  好比说,这我的忽然发觉本身烟瘾犯了,须要出去抽根烟,因而他告诉大堂经理说,排到我这个号码的时候麻烦到外面通知我一下,那么他就没有被阻塞在这个等待的操做上面,天然这个就是异步+非阻塞的方式了。

  不少人会把同步和阻塞混淆,是由于不少时候同步操做会以阻塞的形式表现出来,一样的,不少人也会把异步和非阻塞混淆,由于异步操做通常都不会在真正的IO操做处被阻塞

1.6 进程的建立与结束

  1.6.1 进程的建立

  但凡是硬件,都须要有操做系统去管理,只要有操做系统,就有进程的概念,就须要有建立进程的方式,一些操做系统只为一个应用程序设计,好比微波炉中的控制器,一旦启动微波炉,全部的进程都已经存在。

  而对于通用系统(跑不少应用程序),须要有系统运行过程当中建立或撤销进程的能力,主要分为4中形式建立新的进程:

  1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台而且只在须要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)

  2. 一个进程在运行过程当中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)

  3. 用户的交互式请求,而建立一个新进程(如用户双击暴风影音)

  4. 一个批处理做业的初始化(只在大型机的批处理系统中应用)

  不管哪种,新进程的建立都是由一个已经存在的进程执行了一个用于建立进程的系统调用而建立的。

  1.6.2 进程的结束

       1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)

  2. 出错退出(自愿,python a.py中a.py不存在)

  3. 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,能够捕捉异常,try...except...)

  4. 被其余进程杀死(非自愿,如kill -9)

2. python中的进程操做

  2.1 进程建立的俩种方式

进程的实例化

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化获得的对象,表示一个子进程中的任务(还没有启动)

强调:
1. 须要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:
1 group参数未使用,值始终为None
2 target表示调用对象,即子进程要执行的任务
3 args表示调用对象的位置参数元组,args=(1,2,'egon',)
4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
5 name为子进程的名称

方式一:调用直接模块实现

import time
from multiprocessing import Process

def funk(name):
    print('%s is running!'%name)
    time.sleep(2)
    print('%s is done!'%name)


if __name__ == '__main__':
    p1 = Process(target= funk,args=('子进程1',))
    p1.start()
    print('')

方式二:自建类,

import time
from multiprocessing import Process

class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):#这里必须是RUN函数,不然没法调用
        print('%s is running!'%self.name)
        time.sleep(2)
        print('%s is done!'%self.name)

if __name__ == '__main__':
    p1 = MyProcess('子进程1')

    p1.start()
    print('主进程!')

  2.2 进程id的查询

第一种方式

from multiprocessing import Process

def func1(name):
    print('%s is running'%name)
    print('%s is done'%name)

if __name__ =='__main__':
    p1 = Process(target= func1,args=('子进程1',))
    p1.start()
    print(p1.pid)
    print('')

第二种方式

from multiprocessing import Process
import os

def func2(name):
    print('%s is running'%name)
    print('子进程为%s,父进程为%s'%(os.getpid(),os.getppid()))

if __name__ == '__main__':
    p2 = Process(target= func2,args=('子进程1',))
    p2.start()
    print('')
    print('主进程为',os.getpid())

注意:补充内容,暂时没想到调整放在哪里

1.几个概念:
    父进程:父进程执行的过程当中建立了子进程
    子进程:由父进程建立
    主进程:通常咱们直接执行的那个程序就是主进程,在pycharm中主进程的父进程就是pycharm的进程pid

2.windows为何要有 if __name__ == '__main__':
    windows在开子进程的时候,会从父进程中引入文件,并执行一遍,会变成嵌套
    linux macos中只是复制一分内存空间,并无执行

3.如何 开启多个子进程

4.如何给子进程传参

from multiprocessing import Process
import os

def func(name):
    print('%s is start,id is %s,father id is %s'%(name,os.getpid(),os.getppid()))

if __name__ == '__main__':
    for i in range(10):
        p = Process(target= func,args= ('子进程%s'%i,))#这个时候传参里面必须是集合
        p.start()#非阻塞,只要告诉系统开启就好,开启并非真正的开启

5.子进程有没有返回值
不能有返回值
由于子进程函数中的函数返回值不能传递给父进程,由于内存隔离的问题

  2.3 孤儿进程与僵尸进程

僵尸进程

  • 有害
  • 父进程对子进程收尸,而子进程一直未关闭,父进程一直在等待

孤儿进程

  • 无害
  • 父进程挂了,子进程未挂,子进程会被__init__接管,一段时候后将子进程关闭

  2.4 进程对象的其余属性

p.join():主进程等待子进程结束,这个时候其余子进程也是能够运行,是并发操做的
主进程会默认等待子进程结束后才结束,父进程会回收子进程占用的资源,join是单阻塞,n个进程用join控制
from multiprocessing import Process

def func(n):
print('发送邮件%s'%n)

if __name__ == '__main__':
p_list =[]
for i in range(10):
p = Process(target= func,args=(i,))
p_list.append(p)
p.start()
for p in p_list:
p.join()
print('所有邮件已经发送成功!')

p.pid():查看进程的id号

p.is_alive():查看进程是否活着

p.terminate(): 进程回收,强制结束一个进程,也是非阻塞的

p.name :进程名,在开进程的时候也能够加name参数,对子进程重命名

  2.5 守护进程

主进程建立进程,有三个特色

  • 守护进程会在主进程代码结束后终止
  • 守护进程没法再开启子进程
  • 守护进程只守护主进程的代码结束,不守护子进程,若是真要守护子进程,则对子进程join

守护进程须要在进程开启前设置

#守护进程小测试,设定守护进程对主函数以及子进程都守护!
from multiprocessing import Process
import time

def eye():
    while True:
        print('SERVER,我很好!')
        time.sleep(1)

def func2():
    print('进程2要作的事情!')
    time.sleep(8)
    print('进程2已经结束!')

def main():
    print('作我主要的事情')
    time.sleep(5)
    print('done')

if __name__ == '__main__':
    p = Process(target= eye,)
    p2 = Process(target=func2)
    p.daemon = True
    p.start()
    p2.start()
    main()
    p2.join()

  2.6 进程的同步控制--互斥锁

将本来共享的内容进行加锁,保证这个资源一次性只有一个在运行。加锁后将本来并发的进程变为串行,父进程创建锁,子进程调用锁。

首先看未加锁的状况。

from multiprocessing import Process,Lock
import time

def func(name):
    print('1,%s is running'%name)
    time.sleep(1)
    print('2,%s is runing!'%name)
    time.sleep(1)
    print('3,%s is running'%name)


if __name__ =='__main__':
    for i in range(3):
        ret = Process(target= func,args=('%s子进程'%i,))
        ret.start()

加锁后

from multiprocessing import Process,Lock
import time

def func(name,mutex):
    mutex.acquire()#进程加锁
    print('1,%s is running'%name)
    time.sleep(1)
    print('2,%s is runing!'%name)
    time.sleep(1)
    print('3,%s is running'%name)
    mutex.release()#进程解锁


if __name__ =='__main__':
    mutex = Lock()
    for i in range(3):
        ret = Process(target= func,args=('%s子进程'%i,mutex))
        ret.start()

join与互斥锁的区别

  • join是主进程总体堵塞,
  • 互斥锁是对须要修改的地方作修改,保证数据的安全

模拟一次抢票的过程

import json
import time
from multiprocessing import Process,Lock

def show_ticket(name):
    with open('test.txt','r')as f1:
        file =json.load(f1)
    if file['number'] >=0:
        print('%s 查询余票数量,余票数量为%s'%(name,file['number']))
    else:
        print('%s 查询余票数量,余票为0')

def get_ticket(name):
    with open('test.txt','r')as f1:
        file = json.load(f1)
    time.sleep(0.2)
    if file['number'] >0:
        file['number'] -= 1
        print('%s 购票成功!'%name)
    else:
        print('余票不足,%s 购票失败'%name)
    time.sleep(0.2)
    with open('test.txt', 'w')as f2:
        json.dump(file, f2)

def func(name,lock):
    show_ticket(name)
    lock.acquire()
    get_ticket(name)
    lock.release()


if __name__ == '__main__':
    lock = Lock()
    p_list = []
    for i in range(10):
        p = Process(target=func,args= ('购票者%s'%i,lock))
        p.start()
  • 多个进程抢占同一个数据资源会形成数据不安全
  • 咱们必须牺牲效率来保证数据的安全性

  2.7 进程间的通行--队列与管道

   2.7.1 队列就是基于(管道+锁)实现的,队列中内存通讯

q =Queue():实例化一个管道

q.full():判断队列中是否满,队列是先进先进的体现

q.empty():清空队列

q.put():添加到队列中

q.get():从队列中得到一个对象

队列维护了一个秩序,先进先出(FIFO)

进程之间的通讯

IPC:Iter Process Communication

Pipe:管道,没有锁是进程之间数据不安全的机制,而管道+锁==队列,

队列:是一个黑箱模块,而队列就是进程之间数据安全机制

第三方工具(消息中间件):memcache、redis、kafka、rabbitm

   2.7.2 队列的模型---生产者消费者模型

   2.7.2.1 为何要使用生产者消费者模型

生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。

   2.7.2.2 什么是生产者消费者模型

生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的

生产者:是进程

消费者:是进程

生产者和消费者之间,传递数据,是须要一个盘子(IPC)

以爬虫为例,使用生产者消费者模型是为了平衡供需之间的关系

TIM图片20190210095950

以吃包子为例,看如下代码

from multiprocessing import Queue,Process
import time

def consumer(name,q):
    while True:
        msg = q.get()#get 会阻塞,直到队列中有一个数据
        if not msg :break
        time.sleep(0.7)
        print('%s 抢着吃了%s'%(name,msg))


def producer(name,produce,q):
    for i in range(20):
        msg = '%s %s'%(produce,i)
        q.put(msg)#因为整个队列大小只有5,在最大数量时,阻塞
        print('%s 生产了%s'%(name,msg))
        time.sleep(0.2)


if __name__ == '__main__':
    q = Queue(5)
    p1 = Process(target=consumer,args=('饭桶1',q))
    p2 = Process(target=consumer,args=('饭桶2',q))
    p3 = Process(target=producer,args=('生产者','包子',q))
    p3.start()
    p1.start()
    p2.start()
    p3.join()#确认全部的生产者已经所有生产所有数据
    q.put(None)#发送最终标志位,确认消费者收到明确的标志
    q.put(None)

在队列中还有一个JoinaLequeue的使用

这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

一些经常使用方法

q =JoinableQueue()#实例化一个对象

q.join()#生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止

q.task_done() #使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常

针对这一个能够能够对上面的吃包子进行一个多生产者进行改进

#2.0版本,使用JoinableQueue()
from multiprocessing import JoinableQueue,Process
import time

def consumer(name,q):
    while True:
        msg = q.get()#get 会阻塞,直到队列中有一个数据
        if not msg :break
        time.sleep(0.7)
        print('%s 抢着吃了%s'%(name,msg))
        q.task_done()#消费者告诉生产者他已经吃了一个


def producer(name,produce,q):
    for i in range(20):
        msg = '%s %s'%(produce,i)
        q.put(msg)#因为整个队列大小只有5,在最大数量时,阻塞
        print('%s 生产了%s'%(name,msg))
        time.sleep(0.2)
    q.join()#确保生产者生产的东西所有被吃完


if __name__ == '__main__':
    q = JoinableQueue(5)
    c1 = Process(target=consumer,args=('饭桶1',q))
    c1.daemon = True#生产者生产完以后,消费者也没有存在的必要
    c2 = Process(target=consumer,args=('饭桶2',q))
    c2.daemon = True#生产者生产完以后,消费者也没有存在的必要
    p1 = Process(target=producer,args=('生产者','包子',q))
    p2 = Process(target=producer,args=('生产者','叉烧包',q))
    p_list =[c1,c2,p1,p2]
    for p in p_list:
        p.start()
    p1.join()#确认全部的生产者已经所有生产所有数据
    p2.join()#确认全部的生产者已经所有生产所有数据

  2.8 进程间的数据共享(了解)

进程间应该尽可能避免通讯,即使须要通讯,也应该选择进程安全的工具来避免加锁带来的问题。

之后咱们会尝试使用数据库来解决如今进程之间的数据共享问题。

进程间数据是独立的,能够借助于队列或管道实现通讯,两者都是基于消息传递的虽然进程间数据独立,但能够经过Manager实现数据共享,事实上Manager的功能远不止于此

下面代码示例图

TIM图片20190210113833

#目的:开启100个进程,分别对共享数据减一操做
#方式一(失败):
from multiprocessing import Manager,Process

def work(dict):
    dict['count']-=1

if __name__ == '__main__':
    m = Manager()
    dic = m.dict({'count':100})
    p_list =[]
    for i in range(100):
        p = Process(target=work,args=(dic,))
        p_list.append(p)
        p.start()
    for p in p_list:
        p.join()

    print(dic['count'])
#结果会发现数据不是咱们预期的0,pc的内核越多,结果会越大


#方式二
from multiprocessing import Manager,Process,Lock
#Manager会提供不少数据共享的机制,可是对于一些基础数据类型,他是数据不安全的
#须要咱们本身提供加锁操做

def work(dict,lock):
    # lock.acquire()#加锁操做
    # dict['count']-=1
    # lock.release()#解锁操做
    with lock:#也能够用这个,上下文管理机制
        dict['count']-=1#上文就是lock.acquire(),下文就是lock.release()

if __name__ == '__main__':
    lock = Lock()
    m = Manager()
    dic = m.dict({'count':100})
    p_list =[]
    for i in range(100):
        p = Process(target=work,args=(dic,lock))
        p_list.append(p)
        p.start()
    for p in p_list:
        p.join()

    print(dic['count'])

虽然说有manager这个管理机制,可是咱们平常中仍是不会使用这个,通常是使用第三方工具库,来操做数据共享,毕竟不加锁的数据共享是数据不安全的。

最后的结论就是平常工做中就不要使用manager这个机制,纯粹就是给本身找麻烦。

注意:在多进程中或者是多线程中,数据共享,若是没有加锁,+=,-=,*=,/=都是数据不安全的,必须加锁!

  2.9 进程池的概念以及使用

   2.9.1 进程池

进程池的概念:

  在程序实际处理问题过程当中,忙时会有成千上万的任务须要被执行,闲时可能只有零星任务。那么在成千上万个任务须要被执行的时候,咱们就须要去建立成千上万个进程么?首先,建立进程须要消耗时间,销毁进程也须要消耗时间。第二即使开启了成千上万的进程,操做系统也不能让他们同时执行,这样反而会影响程序的效率。所以咱们不能无限制的根据任务开启或者结束进程。那么咱们要怎么作呢?

  在这里,要给你们介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等处处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。若是有不少任务须要执行,池中的进程数量不够,任务就要等待以前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增长操做系统的调度难度,还节省了开闭进程的时间,也必定程度上可以实现并发效果。

multiprocess.Pool模块

#1.实例化进程池的方法
Pool([numprocess  [,initializer [, initargs]]]):建立进程池

#2.参数介绍
1 numprocess:要建立的进程数,若是省略,将默认使用cpu_count()的值
2 initializer:是每一个工做进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组

#3.主要方法
1 p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。
2 '''须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()'''
3 
4 p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。
5 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。'''
6    
7 p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
8 
9 P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用

注意:开启进程池的时候,若是在开启时没有填写数字,默认都是cpu的核心数量!

比较经常使用的就是p.map(funcation,iterable)、p.apply_async(funcation,args=()),不一样的是p.apply_async须要用close和join来保证进程池的结束!

实际开启一个进程池,而且看一下每一个进程的id

from multiprocessing import Pool
import os
import time

def func(i):
    print(i,os.getpid())
    time.sleep(0.2)

if __name__ == '__main__':
    p =Pool(4)#设定池子最大进程数,通常为cpu+1/cpu数量
    for i in range(20):
        p.apply_async(func,args=(i,))#异步的提交任务
    p.close()#关闭池子,不是阻止进程池中已经有的进程,而是阻止再向池中异步的提交任务
    p.join()#阻塞池子

   2.9.2 进程池以及多进程的性能测试

  以上面的例子在加上开启多个进程咱们能够明显看出多进程与进程池之间有明显的效率

#进程池以及多进程的效率测试
from multiprocessing import Pool,Process
import os
import time

def func(i):
    print(i,os.getpid())


if __name__ == '__main__':
    start_time = time.time()
    p_list =[]
    for i in range(100):
        p = Process(target= func,args=(i,))
        p_list .append(p)
        p.start()
    for p in p_list:
        p.join()
    end_time = time.time()
    process_time = end_time-start_time

    start_time = time.time()
    p =Pool(4)#设定池子最大进程数,通常为cpu+1/cpu数量
    for i in range(20):
        p.apply_async(func,args=(i,))#异步的提交任务
    p.close()#关闭池子,不是阻止进程池中已经有的进程,而是阻止再向池中异步的提交任务
    p.join()#阻塞池子
    end_time = time.time()
    pool_time = end_time-start_time

    print(process_time,pool_time)

#结果

7.379263877868652 0.4368298053741455,咱们能够发现效率是有明显的差异

若是咱们有多个任务,就开启多少个进程,实际上对于咱们来讲,是很是不划算的,因为计算机的cpu数量时有限的,因此咱们起的进程数量是彻底和cpu的个数成比例的。

起多进程的意义

  • 为了更好的利用cpu,因此若是咱们的程序中都是网络i/o以及文件i/o,就不适合起多进程
  • 为了数据隔离,若是咱们的程序中老是要用到数据共享,那么就不适用使用多进程
  • 超过了cpu个数的任务数,都应该用进程池来解决问题,不能无限的开启子进程

    2.9.3 进程池的其余机制

对于开启进程池还有其余方法,方法以下

#直接用map函数,俩步搞定,哈哈哈
from multiprocessing import Pool
import os
import time

def func(i):
    print(i,os.getpid())
    time.sleep(0.2)

if __name__ == '__main__':
    p =Pool(4)#设定池子最大进程数,通常为cpu+1/cpu数量
    p.map(func,range(20))
    # for i in range(20):
    #     p.apply_async(func,args=(i,))#异步的提交任务
    # p.close()#关闭池子,不是阻止进程池中已经有的进程,而是阻止再向池中异步的提交任务
    # p.join()#阻塞池子

2.9.4 进程池的返回值

2.9.4.1 普通函数实现进程的返回值

#普通函数方法
from multiprocessing import Pool

def func(i):
    print('爬取的网页内容%s'%i)
    return len('网页大小!')*i

if __name__ == '__main__':
    p = Pool(5)
    ret_l =[]
    for i in range(10):
        ret =p.apply_async(func,args =(i,))
        ret_l.append(ret)
    for ret in ret_l:
        print(ret.get())
    p.close()
    p.join()

2.9.4.2 map函数实现进程的返回值

#map函数方法
from multiprocessing import Pool

def func(i):
    print('爬取网页的大小%s'%i)
    return len('网页大小!')*i

if __name__ == '__main__':
    p = Pool(5)
    ret =p.map(func,range(10))
    for i in ret:
        print(i)

2.9.4.3 回调函数

模拟一个爬虫爬网页的状况,

正常获取网页,一个一个获取后进行统计

#正常版
from multiprocessing import Pool
import time
import random


def func(i):
    print('爬取的网页的内容%s'%i)
    time.sleep(random.random())
    return len('网页大小!')*i

if __name__ == '__main__':
    p = Pool(5)
    ret_l = []
    for i in range(10):
        ret =p.apply_async(func,args =(i,))
        ret_l.append(ret)
    for ret in ret_l:
        print(ret.get())
    p.close()
    p.join()

第二个版本

#采用回调函数的版本
from multiprocessing import Pool
import time
import random


def func(i):#子进程调用
    time.sleep(random.random())
    print('爬取的网页的内容%s'%i)
    return len('网页大小!')*i

def call_back(contact):#主进程调用
    print(contact)

if __name__ == '__main__':
    p = Pool(5)
    for i in range(20):
        ret =p.apply_async(func,args =(i,),callback=call_back)
    p.close()
    p.join()
#结果
爬取的网页的内容2
10
爬取的网页的内容3
15
爬取的网页的内容1
5
爬取的网页的内容0
0
爬取的网页的内容6
30
爬取的网页的内容4
20
爬取的网页的内容5
25
爬取的网页的内容7
35
爬取的网页的内容9
45
爬取的网页的内容8
40

将n个任务交给n个进程去执行

每个进程在执行完毕后会返回一个值,这个返回值会直接交给callback参数指定的那个函数去进行处理,这样的话,全部的进程,哪个执行的最快,哪个就能够最早进行统计或者其余工做,能够在最短期内获取到结果!