Python基础:之进程

1、进程

1.什么是进程

程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。python

正在进行的一个过程或者说一个任务。而负责执行任务则是cpulinux

2.进程与程序的区别

程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。nginx

须要强调的是:同一个程序执行两次,那也是两个进程,好比打开暴风影音,虽然都是同一个软件,可是一个能够播放苍井空,一个能够播放饭岛爱。web

3.并发与并行

 不管是并行仍是并发,在用户看来都是'同时'运行的,无论是进程仍是线程,都只是一个任务而已,真是干活的是cpu,cpu来作这些任务,而一个cpu同一时刻只能执行一个任务shell

  并行:同时运行,只有具有多个cpu才能实现并行数据库

      并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就能够实现并发,(并行也属于并发)编程

      

 

      全部现代计算机常常会在同一时间作不少件事,一个用户的PC(不管是单cpu仍是多cpu),均可以同时运行多个任务(一个任务能够理解为一个进程)。json

    启动一个进程来杀毒(360软件)windows

    启动一个进程来看电影(暴风影音)安全

    启动一个进程来聊天(腾讯QQ)

  全部的这些进程都需被管理,因而一个支持多进程的多道程序系统是相当重要的

      多道技术:内存中同时存入多道(多个)程序,cpu从一个进程快速切换到另一个,使每一个进程各自运行几十或几百毫秒,这样,虽然在某一个瞬间,一个cpu只能执行一个任务,但在1秒内,cpu却能够运行多个进程,这就给人产生了并行的错觉,即伪并发,以此来区分多处理器操做系统的真正硬件并行(多个cpu共享同一个物理内存)

4.同步异步

同步就是指一个进程在执行某个请求的时候,若该请求须要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去;

    异步是指进程不须要一直等下去,而是继续执行下面的操做,无论其余进程的状态。当有消息返回时系统会通知进程进行处理,这样能够提升执行的效率。

    举个例子,打电话时就是同步通讯,发短息时就是异步通讯。

5.进程的建立

 但凡是硬件,都须要有操做系统去管理,只要有操做系统,就有进程的概念,就须要有建立进程的方式,一些操做系统只为一个应用程序设计,好比微波炉中的控制器,一旦启动微波炉,全部的进程都已经存在。

  而对于通用系统(跑不少应用程序),须要有系统运行过程当中建立或撤销进程的能力,主要分为4中形式建立新的进程

  1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台而且只在须要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)

  2. 一个进程在运行过程当中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)

  3. 用户的交互式请求,而建立一个新进程(如用户双击暴风影音)

  4. 一个批处理做业的初始化(只在大型机的批处理系统中应用)

  

  不管哪种,新进程的建立都是由一个已经存在的进程执行了一个用于建立进程的系统调用而建立的:

  1. 在UNIX中该系统调用是:fork,fork会建立一个与父进程如出一辙的副本,两者有相同的存储映像、一样的环境字符串和一样的打开文件(在shell解释器进程中,执行一个命令就会建立一个子进程)

  2. 在windows中该系统调用是:CreateProcess,CreateProcess既处理进程的建立,也负责把正确的程序装入新进程。

 

  关于建立的子进程,UNIX和windows

  1.相同的是:进程建立后,父进程和子进程有各自不一样的地址空间(多道技术要求物理层面实现进程之间内存的隔离),任何一个进程的在其地址空间中的修改都不会影响到另一个进程。

  2.不一样的是:在UNIX中,子进程的初始地址空间是父进程的一个副本,提示:子进程和父进程是能够有只读的共享内存区的。可是对于windows系统来讲,从一开始父进程与子进程的地址空间就是不一样的。

6.进程的终止

  • 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)
  • 出错退出(自愿,python a.py中a.py不存在)
  • 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,能够捕捉异常,try...except...)
  • 被其余进程杀死(非自愿,如kill -9)

 7.进程的层次结构

 不管UNIX仍是windows,进程只有一个父进程,不一样的是:

  1. 在UNIX中全部的进程,都是以init进程为根,组成树形结构。父子进程共同组成一个进程组,这样,当从键盘发出一个信号时,该信号被送给当前与键盘相关的进程组中的全部成员。

  2. 在windows中,没有进程层次的概念,全部的进程都是地位相同的,惟一相似于进程层次的暗示,是在建立进程时,父进程获得一个特别的令牌(称为句柄),该句柄能够用来控制子进程,可是父进程有权把该句柄传给其余子进程,这样就没有层次了。

8.进程状态

 tail -f access.log |grep '404'

  执行程序tail,开启一个子进程,执行程序grep,开启另一个子进程,两个进程之间基于管道'|'通信,将tail的结果做为grep的输入。

  进程grep在等待输入(即I/O)时的状态称为阻塞,此时grep命令都没法运行

  其实在两种状况下会致使一个进程在逻辑上不能运行,

  1. 进程挂起是自身缘由,遇到I/O阻塞,便要让出CPU让其余进程去执行,这样保证CPU一直在工做

  2. 与进程无关,是操做系统层面,可能会由于一个进程占用时间过多,或者优先级等缘由,而调用其余的进程去使用CPU。

  于是一个进程由三种状态

9.进程并发实现

 进程并发的实如今于,硬件中断一个正在运行的进程,把此时进程运行的全部状态保存下来,为此,操做系统维护一张表格,即进程表(process table),每一个进程占用一个进程表项(这些表项也称为进程控制块)

  该表存放了进程状态的重要信息:程序计数器、堆栈指针、内存分配情况、全部打开文件的状态、账号和调度信息,以及其余在进程由运行态转为就绪态或阻塞态时,必须保存的信息,从而保证该进程在再次启动时,就像从未被中断过同样。

10.python多进程模块

10.1 multiprocessing模块介绍

 

python中的多线程没法利用多核优点,若是想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分状况须要使用多进程。Python提供了很是好用的多进程包multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行咱们定制的任务(好比函数),该模块与多线程模块threading的编程接口相似。multiprocessing模块的功能众多:支持子进程、通讯和共享数据、执行不一样形式的同步,提供了Process、Queue、Pipe、Lock等组件。须要再次强调的一点是:与线程不一样,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

10.2 Process建立进程的类

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化获得的对象,表示一个子进程中的任务(还没有启动) 强调: 1. 须要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数介绍
1.group参数未使用,值始终为None()
2.target表示调用对象,即子进程要执行的任务
3.args表示调用对象的位置参数元组,args=(1,2,'egon',)
4.kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
5.name为子进程的名称

方法介绍

1.p.start():启动进程,并调用该子进程中的p.run() 

2.p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法  

3.p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁

4.p.is_alive():若是p仍然运行,返回True

5.p.join([timeout]):主进程等待p终止(强调:是主进程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

属性介绍

1.p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置

2.p.name:进程的名称

3.p.pid:进程的pid

4.p.exitcode:进程在运行时为None、若是为–N,表示被信号N结束(了解便可)

5.p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络链接的底层进程间通讯提供安全性,这类链接只有在具备相同的身份验证键时才能成功(了解便可)

10.3 Python开启进程的两种方式

注意:在windows中Process()必须放到# if __name__ == '__main__':下

if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.

因为Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 
若是在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 
这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。

方式一

def piao(name):
print("%s is piaoing"%name)
time.sleep(2)
print("%s is piao end"%name)
if __name__ == '__main__':
p1 = Process(target=piao,args=('egon',),name='p1')
p1.start()#启动主进程调用子进程中的run方法
print('p1 name is %s'%p1.name)#此处代码和start同时运行的并行
print('父类')##此处代码和start同时运行的并行

方式二

继承Process,重写run方法,继承Process类必定要重写run方法,init初始化时super父类init方法在定义本身得

class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print("%s is piaoing" % self.name)
        # time.sleep(1)
        print("%s is piao end"%self.name)
if __name__ == '__main__':
    p1=Piao('han')
    p1.start()#自动化调用run方法
    print('父类1')
    p2=Piao('cai')
    p2.start()
    print("父类2")

 练习socket并发通信

方式一

SERVER
from multiprocessing import Process from socket import * server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,addr): while True: #通信循环 try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': while True: #连接循环 conn,addr=server.accept() p=Process(target=talk,args=(conn,addr)) p.start()

 

方式二 

 

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process
from socket import *
class socket_server(Process):
    def __init__(self):
        super(socket_server, self).__init__()

    def run(self):
        server = socket(AF_INET, SOCK_STREAM)
        server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        server.bind(('127.0.0.1', 8081))
        server.listen(5)
        conn, addr = server.accept()
        while True: #通信循环
            try:
                msg=conn.recv(1024)
                if not msg:break
                conn.send(msg.upper())
            except Exception:
                break
if __name__ == '__main__':
    p1 = socket_server()
    p1.start()

 

客户端  

#!/usr/bin/python
# -*- coding:utf-8 -*-
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081))
while True:
    msg=input('>>').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

10.4进程方法属性

#p.join(),是父进程在等p的结束,是父进程阻塞在原地,而p仍然在后台运行
def piao(name): print('%s is piaoing' % name) time.sleep(random.randint(1,3)) print('%s is piao end' % name) if __name__ == '__main__': p1=Process(target=piao,args=('egon',)) p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('wupeiqi',)) p4=Process(target=piao,args=('yuanhao',)) p_l=[p1,p2,p3,p4] for p in p_l: p.start() for p in p_l: p.join()#父进程等待p运行结束后才执行住进程 print('主进程')
#有的同窗会有疑问:既然join是等待进程结束,那么我像下面这样写,进程不就又变成串行的了吗?
#固然不是了,必须明确:p.join()是让谁等?
#很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p,

#详细解析以下:
#进程只要start就会在开始运行了,因此p1-p4.start()时,系统中已经有四个并发的进程了
#而咱们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
#join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其他p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接经过检测,无需等待
# 因此4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间
#进程对象的其余方法一:terminate,is_alive
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s is piao end' %self.name)


p1=Piao('egon1')
p1.start()

p1.terminate()#关闭进程,不会当即关闭,因此is_alive马上查看的结果可能仍是存活
print(p1.is_alive()) #结果为True

print('开始')
print(p1.is_alive()) #结果为False

  

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing  import Process
import time
import random
import os
class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)

if __name__ == '__main__':

    p=Piao('egon')
    p.daemon=True #必定要在p.start()前设置,设置p为守护进程,禁止p建立子进程,而且父进程死,p跟着一块儿死
    p.start()
    p.join(0.0001) #等待p中止,等0.0001秒就再也不等了
    print('开始')
    print(1)
    print(p.is_alive())

 

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
import random
class Piao(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法会执行self.name=Piao-1,
        #                    #因此加到这里,会覆盖咱们的self.name=name

        #为咱们开启的进程设置名字的作法
        super().__init__()

        self.name = name

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)
if __name__ == '__main__':

    p=Piao('egon')
    p.start()
    print('开始')
    print(p.pid) #查看pid

 

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
import random
def piao(name):
    print('%s is piaoing' % name)
    time.sleep(random.randint(1,3))
    print('%s is piao end' % name)
if __name__ == '__main__':
    p1=Process(target=piao,args=('egon',))
    p1.name = 'hanjialong'#修改进程名称
    # p1.daemon=True
    p1.start()

    p1.terminate()
    print(p1.is_alive())
    time.sleep(1)
    print(p1.is_alive())

    print('主进程')

    print(p1.name)
    print(p1.pid)

循环开启进程

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
import random
def foo(name):
    print(name)

if __name__ == '__main__':
    for i in range(1,100):
        p = Process(target=foo, args=('进程%s' % i,))
        p.start()
进程之间数据不共享,可是共享同一套文件系统,因此访问同一个文件,或同一个打印终端,是没有问题的,
part1:共享同一打印终端,发现会有多行内容打印到一行的现象(多个进程共享并抢占同一个打印终端,乱了)
#多进程共享一个打印终端(用python2测试看两个进程同时往一个终端打印,出现打印到一行的错误)
from multiprocessing import Process
import time
class Logger(Process):
    def __init__(self):
        super(Logger,self).__init__()
    def run(self):
        print(self.name)


for i in range(1000000):
    l=Logger()
    l.start()

 线程共享同一个文件,实验:

既然能够用文件共享数据,那么为何通信不用文件做为介质呢?

1.效率问题,使用文件速度太慢

2.须要本身加锁处理

from multiprocessing import Process
import time
import random
#多进程共享一套文件系统

def work(filename,msg):
    with open(filename,'a',encoding='utf-8') as f:
        f.write(msg)

if __name__ == '__main__':

    for i in range(5):
        p=Process(target=work,args=('a.txt','进程%s\n' %str(i)))
        p.start()
        p.join()
        work('a.txt',"主进程%s\n"%str(i))

11 进程间通讯(IPC)

1.队列方式

 进程彼此之间互相隔离,要实现进程间通讯,即IPC,multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

建立队列的类(底层就是以管道和锁定的方式实现)

Queue([maxsize]):建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。

参数介绍

maxsize是队列中容许最大项数,省略则无大小限制。 

方法介绍 

 

1.q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。若是超时,会抛出Queue.Full异常。若是blocked为False,但该Queue已满,会当即抛出Queue.Full异常。

2.q.get方法能够从队列读取而且删除一个元素。一样,get方法有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。若是blocked为False,有两种状况存在,若是Queue有一个值可用,则当即返回该值,不然,若是队列为空,则当即抛出Queue.Empty异常.

3.q.get_nowait():同q.get(False) q.put_nowait():同q.put(False)

4.q.empty():调用此方法时q为空则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中又加入了项目

5.q.full():调用此方法时q已满则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中的项目被取走。

6.q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()同样

 

了解

1 q.cancel_join_thread():不会在进程退出时自动链接后台线程。能够防止join_thread()方法阻塞

2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但还没有写入的数据,但将在此方法完成时立刻关闭。若是q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,若是某个使用者正在被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误。

3 q.join_thread():链接队列的后台线程。此方法用于在调用q.close()方法以后,等待全部队列项被消耗。默认状况下,此方法由不是q的原始建立者的全部进程调用。调用
4.q.cancel_join_thread方法能够禁止这种行为

应用 

 

 
 
multiprocessing模块支持进程间通讯的两种主要形式:管道和队列
都是基于消息传递实现的,可是队列接口
#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Queue
#队列,先进先出
q=Queue(3)#设置队列长度为3

q.put({'a':1})
q.put('b')
q.put('c')
print(q.full())#此时队列已满因此返回True
# q.put('d',False) #等同于q.put_nowait('d')
print(q.get())
q.put('d',timeout=1)#当超过1秒尚未空间存储就会报错
print(q.qsize())
print(q.get())
print(q.get())
print(q.get())
# print(q.get(timeout=2))#在2秒内尚未可取的就会报错
# print(q.get())
# print(q.get())
# print(q.empty())
print(q.get(block=False))
# print(q.get_nowait())

10.6 生产者消费者模型 

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        time.sleep(random.randint(1,3))
        res=q.get()
        print("\033[45m消费者拿到了:%s\033[0m"%res)
def producer(seq,q):
    for item in seq:
        time.sleep(random.randint(1,3))
        print("\033[46m生产者生产了:%s\033[0m"%item)
        q.put(item)
if __name__ == '__main__':
    q=Queue()
    seq=('包子%s' %i for i in range(10))
    producer(seq,q)
    print('主线程')
    c =Process(target=consumer,args=(q,))
    c.start()

主线程等待消费者结束(生产者发送结束信号给消费者)

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        # time.sleep(random.randint(1,3))
        res=q.get()
        print("\033[45m消费者拿到了:%s\033[0m"%res)
def producer(seq,q):
    for item in seq:
        # time.sleep(random.randint(1,3))
        print("\033[46m生产者生产了:%s\033[0m"%item)
        q.put(item)
if __name__ == '__main__':
    q=Queue()
    c =Process(target=consumer,args=(q,))
    c.start()
    seq=('包子%s' %i for i in range(10))
    producer(seq,q)
    c.join()
    print('主线程')

JoinableQueue类

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

    参数介绍:

    maxsize是队列中容许最大项数,省略则无大小限制。    

  方法介绍:

    JoinableQueue的实例p除了与Queue对象相同的方法以外还具备:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止
from multiprocessing import Process,JoinableQueue
import time
import random

def consumer(q,name):
    while True:
        # time.sleep(random.randint(1,3))
        res=q.get()
        q.task_done()
        print('\033[41m消费者%s拿到了%s\033[0m' %(name,res))

def producer(seq,q,name):
    for item in seq:
        # time.sleep(random.randint(1,3))
        q.put(item)
        print('\033[42m生产者%s生产了%s\033[0m' %(name,item))
    q.join()
    print('============>>')

if __name__ == '__main__':
    q=JoinableQueue()
    c=Process(target=consumer,args=(q,'egon'),)
    c.daemon=True #设置守护进程,主进程结束c就结束
    c.start()

    seq=['包子%s' %i for i in range(10)]
    p=Process(target=producer,args=(seq,q,'厨师1'))
    p.start()

    # master--->producer----->q--->consumer(10次task_done)
    p.join() #主进程等待p结束,p等待c把数据都取完,c一旦取完数据,p.join就是再也不阻塞,进
    # 而主进程结束,主进程结束会回收守护进程c,并且c此时也没有存在的必要了
    print('主进程')

 一个生产者多个消费者

from multiprocessing import Process,JoinableQueue
import time,random
def consumer(name,q):
    while True:
        time.sleep(random.randint(1,2))
        res=q.get()
        print('\033[45m%s拿到了 %s\033[0m' %(name,res))
        q.task_done()


def producer(seq,q):
    for item in seq:
        time.sleep(random.randrange(1,2))
        q.put(item)
        print('\033[46m生产者作好了 %s\033[0m' %item)
    q.join()

if __name__ == '__main__':
    q=JoinableQueue()
    seq=('包子%s' %i for i in range(10))

    p1=Process(target=consumer,args=('消费者1',q,))
    p2=Process(target=consumer,args=('消费者2',q,))
    p3=Process(target=consumer,args=('消费者3',q,))
    p1.daemon=True
    p2.daemon=True
    p3.daemon=True
    p1.start()
    p2.start()
    p3.start()

    producer(seq,q)

    print('主线程')

也能够开启一个新的子进程当生产者,不用主线程当生产者

from multiprocessing import Process,JoinableQueue
import time,random
def consumer(name,q):
    while True:
        # time.sleep(random.randint(1,2))
        res=q.get()
        print('\033[45m%s拿到了 %s\033[0m' %(name,res))
        q.task_done()


def producer(seq,q):
    for item in seq:
        # time.sleep(random.randrange(1,2))
        q.put(item)
        print('\033[46m生产者作好了 %s\033[0m' %item)
    q.join()

if __name__ == '__main__':
    q=JoinableQueue()
    seq=['包子%s' %i for i in range(10)] #在windows下没法传入生成器,咱们能够用列表解析测试

    p1=Process(target=consumer,args=('消费者1',q,))
    p2=Process(target=consumer,args=('消费者2',q,))
    p3=Process(target=consumer,args=('消费者3',q,))
    p1.daemon=True
    p2.daemon=True
    p3.daemon=True
    p1.start()
    p2.start()
    p3.start()

    # producer(seq,q) #也能够是下面三行的形式,开启一个新的子进程当生产者,不用主线程当生产者
    p4=Process(target=producer,args=(seq,q))
    p4.start()
    p4.join()
    print('主线程')

2.管道方式

管道也能够说是队列的另一种形式,下面咱们就开始介绍基于管道实现金城之间的消息传递

建立管道的类:

Pipe([duplex]):在进程之间建立一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的链接对象,强调一点:必须在产生Process对象以前产生管道

参数介绍:

dumplex:默认管道是全双工的,若是将duplex射成False,conn1只能用于接收,conn2只能用于发送。

 方法介绍:

 conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。若是链接的另一端已经关闭,那么recv方法会抛出EOFError。

 conn1.send(obj):经过链接发送对象。obj是与序列化兼容的任意对象
1.conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法
2.conn1.fileno():返回链接使用的整数文件描述符
3.conn1.poll([timeout]):若是链接上的数据可用,返回True。timeout指定等待的最长时限。若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达。 4.conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。若是进入的消息,超过了这个最大值,将引起IOError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起EOFError异常。
5.conn.send_bytes(buffer [, offset [, size]]):经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收 6.conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起BufferTooShort异常。

基于管道实现进程间通讯(与队列的方式是相似的,队列就是管道加锁实现的):

from multiprocessing import Process,Pipe

import time,os
def consumer(p,name):
    left,right=p
    left.close()
    while True:
        try:
            baozi=right.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            right.close()
            break
def producer(seq,p):
    left,right=p
    right.close()
    for i in seq:
        left.send(i)
        # time.sleep(1)
    else:
        left.close()
if __name__ == '__main__':
    left,right=Pipe()

    c1=Process(target=consumer,args=((left,right),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(left,right))

    right.close()
    left.close()

    c1.join()
    print('主进程')

注意:生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。若是忘记执行这些步骤,程序可能再消费者中的recv()操做上挂起。管道是由操做系统进行引用计数的,必须在全部进程中关闭管道后才能生产EOFError异常。所以在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点。

管道能够用于双向通讯,利用一般在客户端/服务器中使用的请求/响应模型或远程过程调用,就可使用管道编写与进程交互的程序,以下

#注意:send()和recv()方法使用pickle模块对对象进行序列化。
from multiprocessing import Process,Pipe import time,os def adder(p,name): server,client=p client.close() while True: try: x,y=server.recv() except EOFError: server.close() break res=x+y server.send(res) print('server done') if __name__ == '__main__': server,client=Pipe() c1=Process(target=adder,args=((server,client),'c1')) c1.start() server.close() client.send((10,20)) print(client.recv()) client.close() c1.join() print('主进程')

12 进程间通讯方式三:共享数据

12 .1进程共享数据

from multiprocessing import Manager,Process
import os
def work(d,l):
    l.append(os.getpid())
    d[os.getpid()]=os.getpid()

if __name__ == '__main__':
    m=Manager()
    l=m.list(['init',])
    d=m.dict({'name':'egon'})


    # p1=Process(target=work,args=(d,l))
    # p2=Process(target=work,args=(d,l))
    # p3=Process(target=work,args=(d,l))
    # p4=Process(target=work,args=(d,l))
    # p5=Process(target=work,args=(d,l))
    #
    # p_l=[p1,p2,p3,p4,p5]
    # for p in p_l:
    #     p.start()
    #
    # for p in p_l:
    #     p.join()

    p_l=[]
    for i in range(5):
        p=Process(target=work,args=(d,l))
        p_l.append(p)
        p.start()

    for p in p_l:
        p.join()
    print(d)
    print(l)

12.2  进程同步锁 

加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全。

在4.4小节咱们学习了队列,队列是管道+锁实现的,于是咱们无需考虑复杂的锁问题。

可是在4.3小节中咱们介绍到,进程之间数据隔离,可是共享一套文件系统,于是能够经过文件来实现进程直接的通讯,但问题是必须本身加锁处理

因此,就让咱们帮文件当作数据库,模拟抢票(Lock互斥锁)

#文件db的内容为:{"count":1}
#注意必定要用双引号,否则json没法识别
from multiprocessing import Process,Lock
import json
import time
import random
import os

def work(filename,lock): #买票
    # lock.acquire()
    with lock:
        with open(filename,encoding='utf-8') as f:
            dic=json.loads(f.read())
            # print('剩余票数: %s' % dic['count'])
        if dic['count'] > 0:
            dic['count']-=1
            time.sleep(random.randint(1,3)) #模拟网络延迟
            with open(filename,'w',encoding='utf-8') as f:
                f.write(json.dumps(dic))
            print('%s 购票成功' %os.getpid())
        else:
            print('%s 购票失败' %os.getpid())
    # lock.release()

if __name__ == '__main__':
    lock=Lock()
    p_l=[]
    for i in range(100):
        p=Process(target=work,args=('db',lock))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()

    print('主线程')

13 进程池 

13.1 Pool类

在使用Python进行系统管理时,特别是同时操做多个文件目录或者远程控制多台主机,并行操做能够节约大量的时间。若是操做的对象数目不大时,还能够直接使用Process类动态的生成多个进程,十几个还好,可是若是上百个甚至更多,那手动去限制进程数量就显得特别的繁琐,此时进程池就派上用场了。 
Pool类能够提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,若是池尚未满,就会建立一个新的进程来执行请求。若是池满,请求就会告知先等待,直到池中有进程结束,才会建立新的进程来执行这些请求。
下面介绍一下multiprocessing 模块下的Pool类下的几个方法

#!/usr/bin/python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Lock
import json
import time
import random
def work(dbfile,name,lock):
    # lock.acquire()
    with lock:
        with open(dbfile,encoding='utf-8') as f:
            dic=json.loads(f.read())

        if dic['count'] > 0:
            dic['count']-=1
            time.sleep(random.randint(1,3)) #模拟网络延迟
            with open(dbfile,'w',encoding='utf-8') as f:
                f.write(json.dumps(dic))
            print('\033[43m%s 抢票成功\033[0m' %name)
        else:
            print('\033[45m%s 抢票失败\033[0m' %name)
    # lock.release()


if __name__ == '__main__':
    lock=Lock()
    p_l=[]
    for i in range(100):
        p=Process(target=work,args=('a.txt','用户%s' %i,lock))
        p_l.append(p)
        p.start()


    for p in p_l:
        p.join()
    print('主进程')

使用进程池进程socket通信 

客户端

#客户端
#!/usr/bin/Python # -*- coding:utf-8 -*- from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8081)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))

服务端

#!/usr/bin/python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Pool
from socket import *
import os
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8081))
server.listen(5)

def talk(conn,addr):
print(os.getpid())
while True: #通信循环
try:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__':
pool=Pool()
res_l=[]
while True: #连接循环
conn,addr=server.accept()
# print(addr)
# pool.apply(talk,args=(conn,addr))
res=pool.apply_async(talk,args=(conn,addr))
res_l.append(res)
# print(res_l)

13.2 回调函数

相关文章
相关标签/搜索