Python——并发编程

开始说并发编程以前,最好有必定的底层知识积累,这里我把须要的知识总结了一下,若是看下面的有不理解的能够看一下:http://www.javashuo.com/article/p-rquxjwfh-dk.htmlhtml

引子

  • 计算机的核心是CPU,它承担了全部的计算任务。它就像一座工厂,时刻在运行。
  • 假定工厂的电力有限,一次只能供给一个车间使用。也就是说,一个车间开工的时候,其余车间都必须停工。背后的含义就是,单个CPU一次只能运行一个任务。
  • 进程就比如工厂的车间,它表明CPU所能处理的单个任务。任一时刻,CPU老是运行一个进程,其余进程处于非运行状态。
  • 一个车间里,能够有不少工人。他们协同完成一个任务。
  • 线程就比如车间里的工人。一个进程能够包括多个线程。
  • 车间的空间是工人们共享的,好比许多房间是每一个工人均可以进出的。这象征一个进程的内存空间是共享的,每一个线程均可以使用这些共享内存。
  • 但是,每间房间的大小不一样,有些房间最多只能容纳一我的,好比厕所。里面有人的时候,其余人就不能进去了。这表明一个线程使用某些共享内存时,其余线程必须等它结束,才能使用这一块内存。
  • 一个防止他人进入的简单方法,就是门口加一把锁。先到的人锁上门,后到的人看到上锁,就在门口排队,等锁打开再进去。这就叫"互斥锁"(Mutual exclusion,缩写 Mutex),防止多个线程同时读写某一块内存区域。
  • 还有些房间,能够同时容纳n我的,好比厨房。也就是说,若是人数大于n,多出来的人只能在外面等着。这比如某些内存区域,只能供给固定数目的线程使用。
  • 这时的解决方法,就是在门口挂n把钥匙。进去的人就取一把钥匙,出来时再把钥匙挂回原处。后到的人发现钥匙架空了,就知道必须在门口排队等着了。这种作法叫作"信号量"(Semaphore),用来保证多个线程不会互相冲突。

不难看出,互斥锁是信号量的一种特殊状况(n=1时)。也就是说,彻底能够用后者替代前者。可是,由于互斥锁较为简单,且效率高,因此在必须保证资源独占的状况下,仍是采用这种设计。python

 

上面的内容转载自:http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html编程

看了上面简单的,再说一下复杂的api

进程

进程(英语:process),是计算机中已运行程序的实体。进程为曾经是分时系统的基本运做单位。在面向进程设计的系统(如早期的UNIX,Linux 2.4及更早的版本)中,进程是程序的基本执行实体;在面向线程设计的系统(如当代多数操做系统、Linux 2.6及更新的版本)中,进程自己不是基本运行单位,而是线程的容器。程序自己只是指令、数据及其组织形式的描述,进程才是程序(那些指令和数据)的真正运行实例。若干进程有可能与同一个程序相关系,且每一个进程皆能够同步(循序)或异步(平行)的方式独立运行。现代计算机系统可在同一段时间内以进程的形式将多个程序加载到存储器中,并借由时间共享(或称时分复用),以在一个处理器上表现出同时(平行性)运行的感受。一样的,使用多线程技术(多线程即每个线程都表明一个进程内的一个独立执行上下文)的操做系统或计算机体系结构,一样程序的平行线程,可在多CPU主机或网络上真正同时运行(在不一样的CPU上)。安全

关于进程在操做系统中的状态及调度流程以下图性能优化

简单图:网络

复杂图:数据结构

线程

线程(英语:thread)是操做系统可以进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运做单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中能够并发多个线程,每条线程并行执行不一样的任务。在Unix System V及SunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程。
线程是独立调度和分派的基本单位。线程能够为操做系统内核调度的内核线程,如Win32线程;由用户进程自行调度的用户线程,如Linux平台的POSIX Thread;或者由内核与用户进程,如Windows 7的线程,进行混合调度。
同一进程中的多条线程将共享该进程中的所有系统资源,如虚拟地址空间,文件描述符和信号处理等等。但同一进程中的多个线程有各自的调用栈(call stack),本身的寄存器环境(register context),本身的线程本地存储(thread-local storage)。
一个进程能够有不少线程,每条线程并行执行不一样的任务。
在多核或多CPU,或支持Hyper-threading的CPU上使用多线程程序设计的好处是显而易见,即提升了程序的执行吞吐率。在单CPU单核的计算机上,使用多线程技术,也能够把进程中负责I/O处理、人机交互而常被阻塞的部分与密集计算的部分分开来执行,编写专门的workhorse线程执行密集计算,从而提升了程序的执行效率。多线程

总结:并发

  1. 一个程序至少有一个进程,一个进程至少有一个线程.(进程能够理解成线程的容器)
  2. 进程在执行过程当中拥有独立的内存单元,而多个线程共享内存,从而极大地提升了程序的运行效率。
  3. 线程是最小的执行单元,进程是最小的资源单位。
  4. 进程本质上就是一段程序的运行过程。
  5. 线程在执行过程当中与进程仍是有区别的。每一个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。可是线程不可以独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。

  6. 进程是具备必定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位. 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程本身基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)可是它可与同属一个进程的其余的线程共享进程所拥有的所有资源. 一个线程能够建立和撤销另外一个线程;同一个进程中的多个线程之间能够并发执行。

  7. 进程间的切换比线程间的切换要耗时的多得多。

python中的GIL

关于python的GIL锁简单说就是:不管你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只容许一个线程运行。

因为GIL的存在,python中多线程其实并非真正的多线程,若是想要充分发挥CPU的资源,在python中大部分状况须要使用多进程。然后面又通过优化,出现了多线程加协程。

在开发中,有两种比较常见的处理状况,一个是IO密集型,一个是计算密集型,因为GIL锁的存在,因此python仍是比较适用于IO操做,由于这样能发挥多线程的能力,而要实现计算密集型就须要开多进程,才可以真正的发挥计算机多核的能力。

这里转一下别人的文章,感受写的很全面,很少说明:http://python.jobbole.com/87743/

多线程

 在python中使用的是threading模块,它创建在thread 模块之上。thread模块以低级、原始的方式来处理和控制线程,而threading 模块经过对thread进行二次封装,提供了更方便的api来处理线程。

调用方式

直接调用

import threading
import time
 
def sayhi(num): #定义每一个线程要运行的函数
 
    print("running on number:%s" %num)
 
    time.sleep(3)
 
if __name__ == '__main__':
 
    t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
    t2 = threading.Thread(target=sayhi,args=(2,)) #生成另外一个线程实例
 
    t1.start() #启动线程,能够认为是让进程进入上面进程图中的就绪状态
    t2.start() #启动另外一个线程
 
    print(t1.getName()) #获取线程名
    print(t2.getName())
View Code

 注意,函数传入的时候要传入函数对象,也就是不加括号的形式。

继承式调用

import threading
import time


class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):  #继承式调用必需要重写这个函数

        print("running on number:%s" %self.num)    # self.name是这个线程的名字

        time.sleep(3)

if __name__ == '__main__':

    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()
View Code

线程经常使用方法

实例对象方法

  • join()
    • 在子线程完成运行以前,这个子线程的父线程将一直被阻塞。
  • setDaemon(Ture)
    •  将线程声明为守护线程,必须在start() 方法调用以前设置, 若是不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。

      当咱们 在程序运行中,执行一个主线程,若是主线程又建立一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成

      想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。可是有时候咱们须要的是 只要主线程

      完成了,无论子线程是否完成,都要和主线程一块儿退出,这时就能够 用setDaemon方法啦

  • run()
    • 线程被CPU调度后自动执行线程对象的run方法
  • start()
    • 启动线程对象,能够认为让其进入就绪状态
  • isAlive()
    • 返回线程是否活动的
  • getName()
    • 返回线程名
  • setName()
    • 设置线程名

threading模块提供的方法

  • threading.currentThread():
    • 返回当前的线程变量。
  • threading.enumerate():
    • 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.activeCount():
    • 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

互斥锁

 首先看看下面的代码

import time
import threading

NUM = 100  # 设定一个共享变量
thread_list = []


def lessNum():
    global NUM  # 在每一个线程中都获取这个全局变量
    temp = NUM
    time.sleep(0.1)
    NUM = temp - 1  # 对此公共变量进行-1操做


if __name__ == '__main__':
    for i in range(100):
        t = threading.Thread(target=lessNum)
        t.start()
        thread_list.append(t)

    for t in thread_list:  # 等待全部线程执行完毕
        t.join()

    print('final num:', NUM)
View Code

观察:time.sleep(0.1)  /0.001/0.0000001 结果分别是多少?

按照咱们的预想,应该是100个线程每次减1,而后结果是0,可是实际上不是这样,

当让函数睡0.1秒后,个人电脑的NUM的结果是99,

当让函数睡0.001秒后个人电脑的NUM的结果是90左右,

当让函数睡0.001秒后个人电脑的NUM的结果是也是90左右,

固然了,不一样电脑由于运算速度不同,可能会产生不一样的结果。可是不管怎样也不是咱们预想的结果0,是由于,当线程开启后,这100个线程操做的是同一个资源,当让函数睡0.1秒后,按照如今大部分电脑的运行速度,足够全部的线程得到那个NUM,而那个NUM的值都是100,减一以后,至关于给这个NUM赋值了100个100-1的结果,天然就是99,后面的0.001,和0.0000001结果也大体是这样的流程,只不过随着时间的减小,线程被cpu调用时须要必定的时间,因此一些线程处理的是其余线程处理以后的结果。

由此能够看出多个线程都在同时操做同一个共享资源,可能形成了资源破坏,怎么办呢?(join会形成串行,失去所线程的意义)

咱们能够经过同步锁来解决这种问题,咱们将核心的处理共享数据的地方用锁锁住,就像引子中写的那样,同一时间内只让一个线程访问这个资源。

互斥锁的格式为:

lock = threading.Lock()
lock.acquire()
要锁的内容
lock.release()

上面例子中加锁以后的样子

import time
import threading

NUM = 100  # 设定一个共享变量
thread_list = []


def lessNum():
    global NUM  # 在每一个线程中都获取这个全局变量
    lock.acquire()
    temp = NUM
    time.sleep(0.0000001)
    NUM = temp - 1  # 对此公共变量进行-1操做
    lock.release()

if __name__ == '__main__':
    lock = threading.Lock()
    for i in range(100):
        t = threading.Thread(target=lessNum)
        t.start()
        thread_list.append(t)

    for t in thread_list:  # 等待全部线程执行完毕
        t.join()

    print('final num:', NUM)
View Code

 能够看出来,这样修改了以后就像是串行的了,而这个和python没有关系,其余的编程语言也是这样解决的。那么这样,多线程还有意义吗?答案是确定的,由于只有这一部分须要加锁,其余部分则不须要。

死锁与递归锁

 在线程间共享多个资源的时候,若是两个线程分别占有一部分资源而且同时等待对方的资源,就会形成死锁,由于系统判断这部分资源都正在使用,全部这两个线程在无外力做用下将一直等待下去。下面是一个死锁的例子:

import threading, time


class myThread(threading.Thread):
    def doA(self):
        lockA.acquire()
        print(self.name, "gotlockA", time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name, "gotlockB", time.ctime())
        lockB.release()
        lockA.release()

    def doB(self):
        lockB.acquire()
        print(self.name, "gotlockB", time.ctime())
        time.sleep(2)
        lockA.acquire()
        print(self.name, "gotlockA", time.ctime())
        lockA.release()
        lockB.release()

    def run(self):
        self.doA()
        self.doB()


if __name__ == "__main__":
    lockA = threading.Lock()
    lockB = threading.Lock()
    threads = []
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()
View Code

结果为

能够看到程序陷入了阻塞,实际上是发生了死锁。而有一种经常使用的解决方式,就是使用递归锁。

递归锁的格式:

r_lock = threading.RLock()
r_lock = r_lock.acquire()
要锁的内容 r_lock
= r_lock.release()

为了支持在同一线程中屡次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源能够被屡次acquire。直到一个线程全部的acquire都被release,其余的线程才能得到资源。也就是说每调用一次acquire方法就加1,每调用一次release方法就减1,没人用的时候是默认的0。

上面例子中加锁以后的样子

import threading, time


class myThread(threading.Thread):
    def doA(self):
        r_lock.acquire()
        print(self.name, "gotlockA", time.ctime())
        time.sleep(3)
        r_lock.acquire()
        print(self.name, "gotlockB", time.ctime())
        r_lock.release()
        r_lock.release()

    def doB(self):
        r_lock.acquire()
        print(self.name, "gotlockB", time.ctime())
        time.sleep(2)
        r_lock.acquire()
        print(self.name, "gotlockA", time.ctime())
        r_lock.release()
        r_lock.release()

    def run(self):
        self.doA()
        self.doB()


if __name__ == "__main__":
    r_lock = threading.RLock()
    threads = []
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()
View Code

同步

同步和异步在开始给的网址中已经说过,这里为了方便起见,仍是在进行说明 

同步异步一般用来形容一次方法调用。

  • 同步方法调用一旦开始,调用者必须等到方法调用返回后,才能继续后续的行为。
  • 异步方法调用更像一个消息传递,一旦开始,方法调用就会当即返回,调用者就能够继续后续的操做。而,异步方法一般会在另一个线程中,“真实”地执行着。整个过程,不会阻碍调用者的工做。

这里实现同步就是对线程进行阻塞,等待另外一个线程将数据处理结束而后解除这个阻塞状态。

建立同步对象:

event = threading.Event()

同步对象的经常使用方法:

event.wait():等待flag被设定,一旦event被设定,等同于pass
event.set():设定flag 

event.clear():清除flag

event.isSet(): 查看当前flag状态

注:一个event能够用在多个线程中。

例子

import threading, time


class Boss(threading.Thread):
    def run(self):
        print("BOSS:今晚你们都要加班到22:00。")
        print(event.isSet())  # False
        event.set()
        time.sleep(5)
        print("BOSS:<22:00>能够下班了。")
        print(event.isSet())
        event.set()


class Worker(threading.Thread):
    def run(self):
        event.wait()  # 一旦event被设定,等同于pass

        print("Worker:哎……命苦啊!")
        time.sleep(1)
        event.clear()
        event.wait()
        print("Worker:OhYeah!")


if __name__ == "__main__":
    event = threading.Event()

    threads = []
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    print("ending.....")
View Code

信号量

信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。

计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其余线程调用release()。(相似于停车位的概念)

BoundedSemaphore与Semaphore的惟一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,若是超过了将抛出一个异常。

 例子:

import threading, time


class myThread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)
            time.sleep(2)
            semaphore.release()


if __name__ == "__main__":
    semaphore = threading.Semaphore(5) 

    thrs = []
    for i in range(100):
        thrs.append(myThread())
    for t in thrs:
        t.start()
View Code

这个信号量默认是1。

不难看出,互斥锁是信号量的一种特殊状况(n=1时)。也就是说,彻底能够用后者替代前者。可是,由于互斥锁较为简单,且效率高,因此在必须保证资源独占的状况下,仍是采用这种设计。

队列

Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递 

建立一个队列对象

import Queue
q
= Queue.Queue(maxsize = 10) Queue.Queue类便是一个队列的同步实现。队列长度可为无限或者有限。可经过Queue的构造函数的可选参数maxsize来设定队列长度。若是maxsize小于1就表示队列长度无限。

 python Queue模块有三种队列及构造函数

  1. Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)
  2. LIFO相似于堆,即先进后出。 class queue.LifoQueue(maxsize)
  3. 还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize)

经常使用的方法

  • q.put(item[, block[, timeout]])
    • 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为True。若是队列当前为空且block为True,put()方法就使调用线程暂停,直到空出一个数据单元。若是block为False,put方法将引起Full异常。
    • timeout为等待时间
  • q.get([block[, timeout]])
    • 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。若是队列为空且block为True,get()就使调用线程暂停,直至有项目可用。若是队列为空且block为False,队列将引起Empty异常。
    • timeout为等待时间
  • q.get_nowait()方法

    • 至关Queue.get(False),这种方法在向一个空队列取值的时候会抛一个Empty异常,因此更经常使用的方法是先判断一个队列是否为空,若是不为空则取值

  • q.put_nowait(item)
    • 至关Queue.put(item, False)
  • q.qsize()
    •  返回队列的大小
  • q.empty()
    •  若是队列为空,返回True,反之False
  • q.full()
    • 若是队列满了,返回True,反之False
    • q.full 与 maxsize 大小对应
  • q.task_done() 
    • 消费者线程从队列中get到任务后,任务处理完成,当全部的队列中的任务处理完成后,会使调用queue.join()的线程返回,表示队列中任务以处理完毕。
    • 若是当前一个join()正在阻塞,它将在队列中的全部任务都处理完时恢复执行(即每个由put()调用入队的任务都有一个对应的task_done()调用)。
  • q.join()
    • 阻塞调用线程,直到队列中的全部任务被处理掉。
    • 只要有数据被加入队列,未完成的任务数就会增长。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减小。当未完成的任务数降到0,join()解除阻塞。
    • 实际上意味着等到队列为空,再执行别的操做 

除了按照先进先出,还有一个按照优先级的处理顺序

q=queue.PriorityQueue()
q.put([5,100])      # 优先级为5,放入参数为100
q.put([7,200])      # 优先级为7,放入参数为200
q.put([3,"hello"])
q.put([4,{"name":"alex"}])

生产者和消费者模型

 关于队列,经常跟这个生产者和消费者的模型关联起来,由于二者的契合度很高。因此这里也提一下。

为何要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

这就像,在餐厅,厨师作好菜,不须要直接和客户交流,而是交给前台,而客户去饭菜也不须要不找厨师,直接去前台领取便可,这也是一个结耦的过程。

下面用队列的内容写一个生产者和消费者的例子

import time, random
import queue, threading

q = queue.Queue()


def Producer(name):
    count = 0
    while count < 10:
        print("making........")
        time.sleep(random.randrange(3))
        q.put(count)
        print('Producer %s has produced %s baozi..' % (name, count))
        count += 1
        # q.task_done()
        # q.join()
        print("ok......")


def Consumer(name):
    count = 0
    while count < 10:
        time.sleep(random.randrange(4))
        if not q.empty():
            data = q.get()
            # q.task_done()
            # q.join()
            print(data)
            print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' % (name, data))
        else:
            print("-----no baozi anymore----")
        count += 1


p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()
View Code

多进程

 multiprocessing包是Python中的多进程管理包。与threading.Thread相似,它能够利用multiprocessing.Process对象来建立一个进程。该进程能够运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象能够像多线程那样,经过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。因此,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

调用方式

直接调用

from multiprocessing import Process
import time
def f(name):
    time.sleep(1)
    print('hello', name,time.ctime())

if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = Process(target=f, args=('老王',))
        p_list.append(p)
        p.start()
    for i in p_list:
        p.join()
    print('end')
View Code

继承式调用

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        #self.name = name

    def run(self):
        time.sleep(1)
        print ('hello', self.name,time.ctime())


if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = MyProcess()
        p.start()
        p_list.append(p)

    for p in p_list:
        p.join()

    print('end')
View Code

Process类

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前尚未实现,库引用中提示必须是None; 
  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,若是实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():无论任务是否完成,当即中止工做进程

属性:

  daemon:和线程的setDeamon功能同样

  name:进程名字。

  pid:进程号。

from multiprocessing import Process
import os
import time


def info(title):
    print("title:", title)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())


def f(name):
    info('function f')
    print('hello', name)


if __name__ == '__main__':

    info('main process line')

    time.sleep(1)
    print("------------------")
    p = Process(target=info, args=('老王',))
    p.start()
    p.join()
View Code

结果以下:

能够从进程号看到父子进程之间的关系,而主进程的父进程不是解释器,我用的工具是pycharm,因此主进程的父进程是pycharm的端口号。

进程间的通讯

进程队列Queue

from multiprocessing import Process, Queue


def f(q, n):
    q.put([123, 456, 'hello'])
    q.put(n * n + 1)
    print("son process", id(q))


if __name__ == '__main__':
    q = Queue()
    print("main process", id(q))

    for i in range(3):
        p = Process(target=f, args=(q, i))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())
View Code

管道Pipe

pipe()返回两个链接对象表明pipe的两端。每一个链接对象都有send()方法和recv()方法。

可是若是两个进程或线程对象同时读取或写入管道两端的数据时,管道中的数据有可能会损坏。

当进程使用的是管道两端的不一样的数据则不会有数据损坏的风险。

 Pipe()函数返回一个由管道链接的链接对象,默认状况下为双工(双向)。

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([12, {"name": "laowang"}, 'hello'])
    response = conn.recv()
    print("response", response)
    conn.close()
    print("q_ID2:", id(conn))


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 双向管道

    print("q_ID1:", id(child_conn))
    p = Process(target=f, args=(child_conn,))
    p.start()

    print(parent_conn.recv())
    parent_conn.send("hello")
    p.join()
View Code

Managers

上面的Queue和Pipe只实现了数据交互,没有实现数据共享(即一个进程去改变另外一个进程的数据)。

Manager()返回的manager对象控制了一个server进程,此进程包含的python对象能够被其余的进程经过proxies来访问。从而达到多进程间数据通讯且安全。

Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

from multiprocessing import Process, Manager
 
 
def f(d, l, n):
    d[n] = '1'  # {0:"1"}
    d['2'] = 2  # {0:"1","2":2}

    l.append(n)  # [0,1,2,3,4,   0,1,2,3,4,5,6,7,8,9]


if __name__ == '__main__':

    with Manager() as manager:

        d = manager.dict()  # {}
        l = manager.list(range(5))  # [0,1,2,3,4]

        p_list = []

        for i in range(10):
            p = Process(target=f, args=(d, l, i))
            p.start()
            p_list.append(p)

        for res in p_list:
            res.join()

        print(d)    # {0: '1', 1: '1', 2: '1', 3: '1', 4: '1', '2': 2, 6: '1', 7: '1', 8: '1', 9: '1', 5: '1'}
        print(l)    # [0, 1, 2, 3, 4, 0, 2, 3, 1, 4, 5, 6, 7, 9, 8]
View Code

 进程互斥锁

from multiprocessing import Process, Lock
import time


def f(l, i):
    l.acquire()
    time.sleep(1)
    print('hello world %s' % i)
    l.release()


if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()
View Code

 进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,若是进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • pool.apply_async(func=Foo, args(i,), callback=demo)

    • 这个是异步的接口,进程并发执行。
    • 第三个为回调函数,每执行完一次函数就调用一次回调函数
  • pool.apply(func=Foo, args(i,), callback=demo)

    • 这个是同步的接口,不管线程池设为多少,都是一个进程一个进程的执行。

    • 第三个为回调函数,每执行完一次函数就调用一次回调函数

注:以前感受回调函数彻底没有做用,让子进程去调用那个函数不就能够了吗,为何要加一个回调函数呢,回调函数的区别就是他是由主进程调用的。这样有什么好处呢。好比开十个进程,咱们须要把这个行为记录下来,也就是咱们常说的日志就能够用回调函数来弄。咱们能够把逻辑之外,并且公用的操做放到这个回调函数中,不用每次都要进程去作与逻辑无关的事情。并且这个回调函数必需要加一个值,就是用来接收进程函数中的return返回的值。

from  multiprocessing import Process, Pool
import time, os


def processPool(i):
    time.sleep(1)
    print(i)
    print("son", os.getpid())

    return "HELLO %s" % i


def Back(arg):
    print(arg)


if __name__ == '__main__':

    pool = Pool(5)  # 参数为进程池中维护的进程数量,不写默认为当前计算机的核心数。
    print("main pid", os.getpid())
    for i in range(100):
        pool.apply_async(func=processPool, args=(i,), callback=Back)

    pool.close()
    pool.join()  # join与close调用顺序是固定的

    print('end')
View Code

 注:pool.close()必须放在pool.join()的前面。

协程

协程,又称微线程,纤程。英文名Coroutine。

协程是一种用户级的轻量级线程。协程拥有本身的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。所以:

协程能保留上一次调用时的状态(即全部局部状态的一个特定组合),每次过程重入时,就至关于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

在并发编程中,协程与线程相似,每一个协程表示一个执行单元,有本身的本地数据,与其它协程共享全局数据和其它资源。

目前主流语言基本上都选择了多线程做为并发设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),而与协程相关的是协做式多任务。

无论是进程仍是线程,每次阻塞、切换都须要陷入系统调用(system call),先让CPU跑操做系统的调度程序,而后再由调度程序决定该跑哪个进程(线程)。
并且因为抢占式调度执行顺序没法肯定的特色,使用线程时须要很是当心地处理同步问题,而协程彻底不存在这个问题(事件驱动和异步程序也有一样的优势)。

由于协程是用户本身来编写调度逻辑的,对CPU来讲,协程实际上是单线程,因此CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,因此协程在必定程度上又好于多线程。


优势1: 协程有极高的执行效率。由于子程序切换不是线程切换,而是由程序自身控制,所以,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优点就越明显。
优势2: 不须要多线程的锁机制,协程是非抢占式的,由于只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只须要判断状态就行了,因此执行效率比多线程高不少。
由于协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可得到极高的性能。

协程的原理就是python中的生成器

首先用yield完成简单的生产者和消费者模型

import time
import queue


def consumer(name):
    print("--->ready to eat baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name, new_baozi))
        # time.sleep(1)


def producer():
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while 1:
        time.sleep(1)
        print("\033[32;1m[producer]\033[0m is making baozi %s and %s" % (n, n + 1))
        con.send(n)
        con2.send(n + 1)

        n += 2


if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()
View Code

Greenlet

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可使你在任意函数之间随意切换,而不需把这个函数先声明为generator

from greenlet import greenlet


def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()


def test2():
    print(56)
    gr1.switch()
    print(78)


gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()


# 结果
# 12
# 56
# 34
# 78
例子

Gevent

咱们能够看到能够用greenlet进行了切换,可是还不够,有个更进一步的封装gevent

greenlet咱们还有手动切换,而gevent能够自动的切换,

import gevent
import time


def f(num):
    for i in num:
        time.sleep(0.1)
        print(i)


if __name__ == '__main__':
    gevent.joinall([
        gevent.spawn(f, [1, 2, 3, 4, 5]),
        gevent.spawn(f, "helloworld")
    ])
View Code

注:进程池multiprocessing.Pool和gevent有冲突不能同时使用,有兴趣的能够研究gevent.pool协程池。

猴子补丁

使用Gevent的性能确实要比用传统的线程高,甚至高不少。但这里不得不说它的一个坑那就是猴子补丁

使用方法:

from gevent import monkey
monkey.patch_all()

(1)猴子补丁的由来

         猴子补丁的这个叫法起源于Zope框架,你们在修正Zope的Bug的时候常常在程序后面追加更新部分,这些被称做是“杂牌军补丁(guerillapatch)”,后来guerilla就渐渐的写成了gorllia(猩猩),再后来就写了monkey(猴子),因此猴子补丁的叫法是这么莫名其妙的得来的。

         后来在动态语言中,不改变源代码而对功能进行追加和变动,统称为“猴子补丁”。因此猴子补丁并非Python中专有的。猴子补丁这种东西充分利用了动态语言的灵活性,能够对现有的语言Api进行追加,替换,修改Bug,甚至性能优化等等。

  使用猴子补丁的方式,gevent可以修改标准库里面大部分的阻塞式系统调用,包括socket、ssl、threading和 select等模块,而变为协做式运行。也就是经过猴子补丁的monkey.patch_xxx()来将python标准库中模块或函数改为gevent中的响应的具备协程的协做式对象(通常写为gevent.monkey.patch_all())。这样在不改变原有代码的状况下,将应用的阻塞式方法,变成协程式的。

(2)猴子补丁使用时的注意事项

猴子补丁的功能很强大,可是也带来了不少的风险,尤为是像gevent这种直接进行API替换的补丁,整个Python进程所使用的模块都会被替换,可能本身的代码能hold住,可是其它第三方库,有时候问题并很差排查,即便排查出来也是很棘手,因此,就像松本建议的那样,若是要使用猴子补丁,那么只是作功能追加,尽可能避免大规模的API覆盖。或者得确保项目中用到其余用到的网络库也必须使用纯Python或者明确说明支持Gevent

补坑

在前面说了为了减小GIL锁对高并发的程序产生的影响,不少人想了不少办法。

举个例子:给你200W条url,须要你把每一个url对应的页面抓取保存起来,这种时候,单单使用多进程,效果确定是不好的。为何呢?

例如每次请求的等待时间是2秒,那么以下(忽略cpu计算时间):

一、单进程+单线程:须要2秒*200W=400W秒==1111.11个小时==46.3天,这个速度明显是不能接受的

二、单进程+多线程:例如咱们在这个进程中开了10个多线程,比1中可以提高10倍速度,也就是大约4.63天可以完成200W条抓取,请注意,这里的实际执行是:线程1碰见了阻塞,CPU切换到线程2去执行,碰见阻塞又切换到线程3等等,10个线程都阻塞后,这个进程就阻塞了,而直到某个线程阻塞完成后,这个进程才能继续执行,因此速度上提高大约能到10倍(这里忽略了线程切换带来的开销,实际上的提高应该是不能达到10倍的),可是须要考虑的是线程的切换也是有开销的,因此不能无限的启动多线程(开200W个线程确定是不靠谱的)

三、多进程+多线程:这里就厉害了,通常来讲也有不少人用这个方法,多进程下,每一个进程都能占一个cpu,而多线程从必定程度上绕过了阻塞的等待,因此比单进程下的多线程又更好使了,例如咱们开10个进程,每一个进程里开20W个线程,执行的速度理论上是比单进程开200W个线程快10倍以上的(为何是10倍以上而不是10倍,主要是cpu切换200W个线程的消耗确定比切换20W个进程大得多,考虑到这部分开销,因此是10倍以上)。

而根据前面对协程的解释,它是不须要没有切换线程的开销的。这个时候使用多进程+协程(能够看做是每一个进程里都是单线程,而这个单线程是协程化的)

多进程+协程下,避开了CPU切换的开销,又能把多个CPU充分利用起来,这种方式对于数据量较大的爬虫还有文件读写之类的效率提高是巨大的。

可是上面的内容也只是针对这种IO密集型,计算密集型仍是多进程+单线程跑吧,没辙,这样还要快一些。

#-*- coding=utf-8 -*-
import requests
from multiprocessing import Process
import gevent
from gevent import monkey; monkey.patch_all()
 
import sys
reload(sys)
sys.setdefaultencoding('utf8')
def fetch(url):
    try:
        s = requests.Session()
        r = s.get(url,timeout=1)#在这里抓取页面
    except Exception,e:
        print e 
    return ''
 
def process_start(url_list):
    tasks = []
    for url in url_list:
        tasks.append(gevent.spawn(fetch,url))
    gevent.joinall(tasks)#使用协程来执行
 
def task_start(filepath,flag = 100000):#每10W条url启动一个进程
    with open(filepath,'r') as reader:#从给定的文件中读取url
        url = reader.readline().strip()
        url_list = []#这个list用于存放协程任务
        i = 0 #计数器,记录添加了多少个url到协程队列
        while url!='':
            i += 1
            url_list.append(url)#每次读取出url,将url添加到队列
            if i == flag:#必定数量的url就启动一个进程并执行
                p = Process(target=process_start,args=(url_list,))
                p.start()
                url_list = [] #重置url队列
                i = 0 #重置计数器
            url = reader.readline().strip()
        if url_list not []:#若退出循环后任务队列里还有url剩余
            p = Process(target=process_start,args=(url_list,))#把剩余的url全都放到最后这个进程来执行
            p.start()
  
if __name__ == '__main__':
    task_start('./testData.txt')#读取指定文件
一个例子