Python学习笔记 - day13 - 进程与线程

概述

  咱们都知道windows是支持多任务的操做系统。html

  什么叫“多任务”呢?简单地说,就是操做系统能够同时运行多个任务。打个比方,你一边在用浏览器上网,一边在听MP3,一边在用Word赶做业,这就是多任务,至少同时有3个任务正在运行。还有不少任务悄悄地在后台同时运行着,只是桌面上没有显示而已。python

  如今,多核CPU已经很是普及了,可是,即便过去的单核CPU,也能够执行多任务。因为CPU执行代码都是顺序执行的,那么,单核CPU是怎么执行多任务的呢?linux

  答案就是操做系统轮流让各个任务交替执行,任务1执行0.01秒,切换到任务2,任务2执行0.01秒,再切换到任务3,执行0.01秒……这样反复执行下去。表面上看,每一个任务都是交替执行的,可是,因为CPU的执行速度实在是太快了,咱们感受就像全部任务都在同时执行同样。nginx

  真正的并行执行多任务只能在多核CPU上实现,可是,因为任务数量远远多于CPU的核心数量,因此,操做系统也会自动把不少任务轮流调度到每一个核心上执行。web

  对于操做系统来讲,一个任务就是一个进程(Process),好比打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。编程

  有些进程还不止同时干一件事,好比Word,它能够同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就须要同时运行多个“子任务”,咱们把进程内的这些“子任务”称为线程(Thread)。json

  因为每一个进程至少要干一件事,因此,一个进程至少有一个线程。固然,像Word这种复杂的进程能够有多个线程,多个线程能够同时执行,多线程的执行方式和多进程是同样的,也是由操做系统在多个线程之间快速切换,让每一个线程都短暂地交替运行,看起来就像同时执行同样。固然,真正地同时执行多线程须要多核CPU才可能实现。windows

  咱们前面编写的全部的Python程序,都是执行单任务的进程,也就是只有一个线程。若是咱们要同时执行多个任务怎么办?有两种解决方案:api

    1. 一种是启动多个进程,每一个进程虽然只有一个线程,但多个进程能够一块执行多个任务。  
    2. 一种方法是启动一个进程,在一个进程内启动多个线程,这样,多个线程也能够一块执行多个任务。  

  固然还有第三种方法,就是启动多个进程,每一个进程再启动多个线程,这样同时执行的任务就更多了,固然这种模型更复杂,实际不多采用。浏览器

  总结一下就是,多任务的实现有3种方式:

    • 多进程模式;  
    • 多线程模式;  
    • 多进程+多线程模式  

  同时执行多个任务一般各个任务之间并非没有关联的,而是须要相互通讯和协调,有时,任务1必须暂停等待任务2完成后才能继续执行,有时,任务3和任务4又不能同时执行,因此,多进程和多线程的程序的复杂度要远远高于咱们前面写的单进程单线程的程序。

  Python既支持多进程,又支持多线程。

进程

  正在进行的一个过程或者说一个任务。而负责执行任务的则是CPU

  因为如今计算计算机都是多任务同时进行的,好比:打开了QQ,而后听着音乐,后面下载者片儿,那么这些都是怎么完成的呢?答案是经过多进程。操做系统会对CPU的时间进行规划,每一个进程执行一个任务(功能),CPU会快速的在这些进行之间进行切换已达到同时进行的目的(单核CPU的状况

进程与程序

程序:一堆代码的集合体。
进程:指的是程序运行的过程。
注意的是:一个程序执行两次,那么会产生两个互相隔离的进程。

并发与并行

并行:同时运行,只有具有多个CPU才能实现并行
并发:是伪并行,即看起来是同时运行。单个CPU+多道技术就能够实现并发。(并行也属于并发)

同步与异步

同步指一个进程在执行某个请求的时候,若该请求须要一段时间才能返回信息,那么这个进程将会一直等待下去,直到返回西南喜才继续执行下去。
异步是指进程不须要一直等下去,而是继续执行下面的操做,无论其余进程的状态。当有消息返回时系统会通知进程处理,这样能够提升执行的效率。
例子:打电话就是同步,发短信就是异步  

进程的建立

主要分为4种:
一、系统初始化:(查看进程Linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台而且只有在须要时才唤醒的进程,成为守护进程,如电子邮件,web页面,新闻,打印等)
二、一个进程在运行过程当中开启了子进程(如nginx开启多线程,操做系统os.fork(),subprocess.Popen等)
三、用户的交互请求,而建立一个新的进程(如用户双击QQ)
四、一个批处理做业的开始(只在大型批处理系统中应用)

以上四种其实都是由一个已经存在了的进程执行了一个用于建立进程的系统调用而建立的。

  1. 在unix/Linux系统中该调用是:fork,它很是特殊。普通的函数调用,调用一次,返回一次,可是fork()调用一次,返回两次,由于操做系统自动把当前进程(称为父进程)复制了一份(称为子进程),而后,分别在父进程和子进程内返回。子进程返回0,父进程返回子进程的PID。
  2. 在winodws中调用的是createProcess,CreateProcess既处理进程的建立,也负责把正确的程序装入新进程。

注意:

  1. 进程建立后父进程和子进程有各自不一样的地址空间(多道技术要求物理层面实现进程之间内存的隔离),任何一个进程的在其地址空间中的修改都不会影响到另外的进程。
  2. 在Unix/linux,子进程的初始地址空间是父进程的一个副本,子进程和父进程是能够有只读的共享内存区的。可是对于Winodws系统来讲,从一开始父进程与子进程的地址空间就是不一样的。

进程之间共享终端,共享一个文件系统

进程的状态

  进程的状态主要分为三种:进行、阻塞、就绪

线程 

  在传统的操做系统中,每一个进程有一个地址空间,并且默认就有一个控制线程,多线程(及多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,进程只是用来把资源集中到一块儿(进程只是一个资源单位,或者说资源集合),而线程才是CPU的执行单位。

为什么要用多线程

多线程指的是,在一个进程中开启多个线程,简单来讲:若是多个任务公用一块地址空间,那么必须在一个进程内开启多个线程。
一、多线程共享一个进程的地址空间
二、线程比进程更轻量级,线程比进程更容易建立和撤销,在许多操做系统中,建立一个线程比建立一个进程要快10-100倍
三、对于CPU密集型的应用,多线程并不能提高性能,但对于I/O密集型,使用多线程会明显的提高速度(I/O密集型,根本用不上多核优点)
四、在多CPU系统中,为了最大限度的利用多核,能够开启多个线程(比开进程开销要小的多)    --> 针对其余语言
注意:
Python中的线程比较特殊,其余语言,1个进程内4个线程,若是有4个CPU的时候,是能够同时运行的,而Python在同一时间1个进程内,只有一个线程能够工做。(就算你有再多的CPU,对Python来讲用不上)

线程与进程的区别

一、线程共享建立它的进程的地址空间,进程拥有本身的地址空间
二、线程能够直接访问进程的数据,进程拥有它父进程内存空间的拷贝
三、线程能够和同一进程内其余的线程直接通讯,进程必须interprocess communicateion(IPC机制)进行通讯
四、线程能够被很容易的建立,而进程依赖于父进程内存空间的拷贝
五、线程能够直接控制同一进程内的其余线程,进程只能控制本身的子进程
六、改变主线程(控制)可能会影响其余线程,改变主进程不会影响它的子进程

multiprocessing模块

    python中的多线程没法利用多核优点,若是想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分状况须要使用多进程。Python提供了multiprocessing,该模块用来开启子进程,并在子进程中执行咱们定制的任务(好比函数),该模块与多线程模块threading的编程接口相似。

    multiprocessing模块的功能众多:支持子进程、通讯和共享数据、执行不一样形式的同步,提供了Process、Queue、Pipe、Lock等组件。

    须要再次强调的一点是:与线程不一样,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

Process类和使用

注意:在windows中Process()必须放到# if __name__ == '__main__':下

利用Process建立进程的类

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化获得的对象,表示一个子进程中的任务(还没有启动)

强调:
1. 须要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数:

  • group参数未使用,值始终为None
  • target表示调用对象,即子进程要执行的任务
  • args表示调用对象的位置参数元组,args=(1,2,'egon',)
  • kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
  • name为子进程的名称

Process类的方法  

p.start():
# 启动进程,并调用该子进程中的p.run()   --> 和直接调用run方法是不一样的,由于它会初始化部分其余参数。

p.run():
# 进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法

p.terminate():
# 强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁

p.is_alive():
# 若是p仍然运行,返回True

p.join([timeout]):
# 主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

Process的其余属性 

p.daemon:
# 默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置

p.name:
# 进程的名称

p.pid:
# 进程的pid

p.exitcode:
# 进程在运行时为None、若是为–N,表示被信号N结束(了解便可)

p.authkey:
# 进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络链接的底层进程间通讯提供安全性,这类链接只有在具备相同的身份验证键时才能成功

特别强调:设置 p.daemon=True 是会随着主进程执行完毕而被回收,无论子进程是否完成任务。

基本使用

使用Process建立进程的类有两种方法:

  一、经过实例化Process类完成进程的建立

  二、继承Process类,定制本身须要的功能后实例化建立进程类

#  --------------------------- 方法1 ---------------------------
import random
import time
from multiprocessing import Process
 
def hello(name):
    print('Welcome to my Home')
    time.sleep(random.randint(1,3))
    print('Bye Bye')
 
p = Process(target=hello,args=('daxin',))    # 建立子进程p
p.start()      # 启动子进程
print('主进程结束')


#  --------------------------- 方法2 ---------------------------
import random
import time
from multiprocessing import Process

class MyProcess(Process):

    def __init__(self,name):
        super(MyProcess, self).__init__()    # 必须继承父类的构造函数
        self.name = name

    def run(self):     # 必须叫run方法,由于start,就是执行的run方法。
        print('Welcome to {0} Home'.format(self.name))
        time.sleep(random.randint(1,3))
        print('Bye Bye')

p = MyProcess('daxin')
p.start()
print('主进程结束')

利用多进程完成修改socket server

上一节咱们利用socket完成了socket server的编写,这里咱们使用multiprocessing对server端进行改写,完成并发接受请求的功能。

Socket Server端
Socket client端

若是服务端接受上万个请求,那么岂不是要建立1万个进程去分别对应?这样是不行的,那么咱们可使用进程池的概念来解决这个问题,进程池的问题,在后续小节中详细说明

进程同步锁

进程之间数据不共享,可是共享同一套文件系统,因此访问同一个文件,或同一个打印终端,是没有问题的,竞争带来的结果就是错乱,如何控制,就是加锁处理。

争抢资源形成的顺序问题

锁的目的就是:当程序1在使用的时候,申请锁,而且锁住共享资源,待使用完毕后,释放锁资源,其余程序获取锁后,重复这个过程。

Multiprocessing模块提供了Lock对象用来完成进程同步锁的功能

from multiprocessing import Lock
lock = Lock()

# 对象没有参数
# 经过使用lock对象的acquire/release方法来进行 锁/释放 的需求。

利用进程同步锁模拟抢票软件的需求:

  1. 建立票文件,内容为json,设置余票数量
  2. 并发100个进程抢票
  3. 利用random + time 模块模拟网络延迟
import random
import time
import json
from multiprocessing import Process,Lock

def gettickles(filename,str,lock):

    lock.acquire()      # 对要修改的部分加锁
    with open(filename,encoding='utf-8') as f:
        dic = json.loads(f.read())

    if dic['count'] > 0 :
        dic['count'] -= 1
        time.sleep(random.random())
        with open(filename,'w',encoding='utf-8') as f:
            f.write(json.dumps(dic))
        print('\033[33m{0}抢票成功\033[0m'.format(str))
    else:
        print('\033[35m{0}抢票失败\033[0m'.format(str))

    lock.release()     # 修改完毕后解锁

if __name__ == '__main__':
    lock = Lock()   # 建立一个锁文件

    p_l = []
    for i in range(1000):
        p = Process(target=gettickles,args=('a.txt','用户%s' % i,lock))
        p_l.append(p)
        p.start()   

加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

进程池

在利用Python进行系统管理的时候,特别是同时操做多个文件目录,或者远程控制多台主机,并行操做能够节约大量的时间。多进程是实现并发的手段之一,须要注意的问题是:

  1. 很明显须要并发执行的任务一般要远大于核数
  2. 一个操做系统不可能无限开启进程,一般有几个核就开几个进程
  3. 进程开启过多,效率反而会降低(开启进程是须要占用系统资源的,并且开启多余核数目的进程也没法作到并行)

例如当被操做对象数目不大时,能够直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但若是是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时能够发挥进程池的功效。

咱们就能够经过维护一个进程池来控制进程数目,好比httpd的进程模式,规定最小进程数和最大进程数... 
ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool能够提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,若是池尚未满,那么就会建立一个新的进程用来执行该请求;但若是池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

    建立进程池的类:若是指定numprocess为3,则进程池会从无到有建立三个进程,而后自始至终使用这三个进程去执行全部任务,不会开启其余进程

from multiprocessing import Pool
pool = Pool(processes=None, initializer=None, initargs=())

参数:

  • processes:进程池的最大进程数量
  • initiallizer:初始化完毕后要执行的函数
  • initargs:要传递给函数的参数

经常使用方法

p.apply(func [, args [, kwargs]])  
# 调用进程池中的一个进程执行函数func,args/kwargs为传递的参数,注意apply是阻塞式的,既串行执行。

p.apply_async(func [, args [, kwargs]])  
# 功能同apply,区别是非阻塞的,既异步执行。   ———> 经常使用

p.close() 
# 关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成

P.join() 
# 等待全部工做进程退出。此方法只能在close()或teminate()以后调用

注意:

  apply_async 会返回AsyncResul对象,这个AsyncResul对象有有一下方法:

View Code

利用进程池改写socket server:

import os
import socket
import multiprocessing

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8100))
server.listen(5)

def talk(conn):
    print('个人进程号是: %s' % os.getpid() )
    while True:
        msg = conn.recv(1024)
        if not msg:break
        data = msg.decode('utf-8')
        msg = data.upper()
        conn.send(msg.encode('utf-8'))

if __name__ == '__main__':
    pool = multiprocessing.Pool(1)

    while True:

        conn,addr = server.accept()
        print(addr)
        pool.apply_async(talk,args=(conn,))
        pool.close()
        pool.join()

  这里指定了进程池的数量为1,那么并发两个链接的话,第二个会hold住,只有第一个断开后,才会链接,注意:进程的Pid号,仍是相同的。 

回调函数

  须要回调函数的场景:进程池中任何一个任务一旦处理完了,就当即告知主进程:我好了额,你能够处理个人结果了。主进程则调用一个函数去处理该结果,该函数即回调函数。咱们能够把耗时间(阻塞)的任务放到进程池中,而后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

apply_async(self, func, args=(), kwds={}, callback=None)

#  func的结果会交给指定的callback函数处理

 一个爬虫的小例子:

from multiprocessing import Pool
import requests
import os


def geturl(url):
    print('个人进程号为: %s' % os.getpid())
    print('我处理的url为: %s ' % url )
    response = requests.get(url)    # 请求网页
    return response.text     # 返回网页源码


def urlparser(htmlcode):
    print('个人进程号是: %s ' % os.getpid())
    datalength = len(htmlcode)      # 计算源码的长度
    print('解析到的html大小为: %s' % datalength)



if __name__ == '__main__':
    pool = Pool()
    url = [
        'http://www.baidu.com',
        'http://www.sina.com',
        'http://www.qq.com',
        'http://www.163.com'
    ]

    res_l = []
    for i in url:
        res = pool.apply_async(geturl,args=(i,),callback=urlparser)    # res 是 geturl执行的结果,由于已经交给urlparser处理了,因此这里不用拿
        res_l.append(res)

    pool.close()
    pool.join()
    for res in res_l:
        print(res.get())    # 这里拿到的就是网页的源码

进程间通信 

  进程彼此之间互相隔离,要实现进程间通讯(IPC),multiprocessing模块提供的两种形式:队列和管道,这两种方式都是使用消息传递的。可是还有一种基于共享数据的方式,如今已经不推荐使用,建议使用队列的方式进行进程间通信。

  展望将来,基于消息传递的并发编程是大势所趋,即使是使用线程,推荐作法也是将程序设计为大量独立的线程集合,经过消息队列交换数据。这样极大地减小了对使用锁定和其余同步手段的需求,还能够扩展到分布式系统中。

队列

  底层就是以管道和锁定的方式实现。

建立队列的类

Queue([maxsize]):建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。 

# 参数
maxsize: 队列能承载的最大数量,省略的话则不限制队列大小

基本使用:

from multiprocessing import Queue

q = Queue(3)
q.put('a')          # 数据存入Queue
print(q.get())      # 从Queue中取出数据

注意:队列(Queue)是FIFO模式,既先进先出。

队列的方法

q.put() 用于插入数据到队列中。

q.put(obj, block=True, timeout=None)  

# 参数:
#    blocked,timeout:若是blocked为True(默认值),而且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。若是超时,会抛出Queue.Full异常。若是blocked为False,但该Queue已满,会当即抛出Queue.Full异常。

PS:q.put_nowait() 等同于 q.put(block=False)

q.get()  用于从队列中获取数据。

q.get(block=True,timeout=None)

# 参数:
# blocked和timeout。若是blocked为True(默认值),而且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。若是blocked为False,有两种状况存在,若是Queue有一个值可用,则当即返回该值,不然,若是队列为空,则当即抛出Queue.Empty异常.

PS:q.get_nowait() 等同于 q.get(block=False)

其余的方法(不是特别准确,能够忘记)

生产者消费者模型

  在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度。

为何要使用生产者和消费者模式

  在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。

什么是生产者消费者模式

  生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现生产者消费者模型:

  • 生产者只负责生产蛋糕,生产完毕的蛋糕放在队列中
  • 消费者只负责消费蛋糕,每次从队列中拿取蛋糕
生产者消费者基础模型

上面的例子很完美,可是生产者生产完毕,消费者也消费完毕了,那么咱们的主程序就应该退出了,但是并无,由于消费者还在等待从队列中获取(q.get),这里咱们考虑能够发送一个作完/吃完的信号,抓取到信号后退出便可。

  • 在队列中放固定的值来作信号
  • 利用JoinableQueue对象 + daemon属性 来对消费者进程进行回收
使用关键字信号
使用JoinableQueue对象
JoinableQueue + daemon

其中:

  1. 利用JoinableQueue对象的join,task_done方法,完成确认/通知的目的。
  2. 若是生产者生产完毕,消费者必然也会给生产者确认消费完毕,那么只要等待生产者执行完毕后进行就能够退出主进程了。
  3. 主进程退出可是消费者进程还未回收,那么就能够设置消费者daemon属性为true,跟随主进程被回收便可。

共享数据

  进程间数据是独立的,能够借助于队列或管道实现通讯,两者都是基于消息传递的,虽然进程间数据独立,但也能够经过Manager实现数据共享,事实上Manager的功能远不止于此。

Manager()  
# 没有参数

# 使用Manager对象建立共享数据类型

利用Manager建立数据,完成进程共享

import os
from multiprocessing import Manager,Process

def worker(d,l):

    d[os.getpid()]=os.getpid()   # 对共享数据进行修改
    l.append(os.getpid())

if __name__ == '__main__':
    m = Manager()
    d = m.dict()    # 建立共享字典
    l = m.list()    # 建立共享列表

    p_l = []
    for i in range(10):
        p= Process(target=worker,args=(d,l))
        p_l.append(p)
        p.start()

    for p in p_l:
        p.join()

    print(d)
    print(l)

Threading模块

Python 标准库提供了 thread 和 threading 两个模块来对多线程进行支持。其中, thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块经过对 thread 进行二次封装,提供了更方便的 api 来处理线程。

PS:multiprocessing彻底模仿了threading模块的接口,两者在使用层面,有很大的类似性,因此不少用法都是相同的,因此可能看起来会比较眼熟。

Thread类和使用

Thread 是threading模块中最重要的类之一,可使用它来建立线程。

有两种方式来建立线程:

  1. 经过继承Thread类,重写它的run方法;
  2. 建立一个threading.Thread对象,在它的初始化函数(__init__)中将可调用对象做为参数传入;
# -----------------------实例化对象--------------------------
import threading

def work(name):

    print('hello,{0}'.format(name))



if __name__ == '__main__':
    t = threading.Thread(target=work,args=('daxin',))
    t.start()

    print('主进程')


# -----------------------本身建立类--------------------------
import threading


class Work(threading.Thread):

    def __init__(self,name):
        super(Work, self).__init__()
        self.name = name

    def run(self):
        print('hello,{0}'.format(self.name))


if __name__ == '__main__':
    t = Work(name='daxin')
    t.start()

    print('主进程')

PS:执行的时候,咱们能够看到会先打印"hello,daxin",而后才会打印"主进程",因此这也同时说明了,建立线程比建立进程消耗资源少的多,线程会被很快的建立出来并执行。若是咱们在target执行的函数和主函数中,同时打印os.getpid,你会发现,进程号是相同的,这也说明了这里开启的是自线程。

相关文章
相关标签/搜索