Python 的并发编程

本文最早发布在博客: https://blog.ihypo.net/151628...

这篇文章将讲解 Python 并发编程的基本操做。并发和并行是对孪生兄弟,概念常常混淆。并发是指可以多任务处理,并行则是是可以同时多任务处理。Erlang 之父 Joe Armstrong 有一张很是有趣的图说明这两个概念:
html

我我的更喜欢的一种说法是:并发是宏观并行而微观串行。python

GIL

虽然 Python 自带了很好的类库支持多线程/进程编程,但众所周知,由于 GIL 的存在,Python 很难作好真正的并行。编程

GIL 指全局解释器锁,对于 GIL 的介绍:安全

全局解释器锁(英语:Global Interpreter Lock,缩写GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任什么时候刻仅有一个线程在执行。多线程

  • 维基百科

其实与其说 GIL 是 Python 解释器的限制,不如说是 CPython 的限制,由于 Python 为了保障性能,底层大多使用 C 实现的,而 CPython 的内存管理并非线程安全的,为了保障总体的线程安全,解释器便禁止多线程的并行执行。并发

由于 Python 社区认为操做系统的线程调度已经很是成熟了,没有必要本身再实现一遍,所以 Python 的线程切换基本是依赖操做系统,在实际的使用中,对于单核 CPU,GIL 并无太大的影响,但对于多核 CPU 却引入了线程颠簸(thrashing)问题。app

线程颠簸是指做为单一资源的 GIL 锁,在被多核心竞争强占时资源额外消耗的现象。dom

好比下图,线程1 在释放 GIL 锁后,操做系统唤醒了 线程2,并将 线程2 分配给 核心2 执行,可是若是此时 线程2 却没有成功得到 GIL 锁,只能再次被挂起。此时切换线程、切换上下文的资源都将白白浪费。异步

所以,Python 多线程程序在多核 CPU 机器下的性能不必定比单核高。那么若是是计算密集型的程序,通常仍是考虑用 C 重写关键部分,或者使用多进程避开 GIL。async

多线程

在 Python 中使用多线程,有 threadthreading 可供原则,thread 提供了低级别的、原始的线程以及一个简单的锁,由于 thread 过于简陋,线程管理容易出现人为失误,所以官方更建议使用 threading,而 threading 也不过是对 thread 的封装和补充。(Python3 中 thread 被更名为 _thread)。

在 Python 中建立线程很是简单:

import time
import threading


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(1)
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(threading.Thread(
            target=do_task,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

直接建立线程简单优雅,若是逻辑复杂,也能够经过继承 Thread 基类完成多线程:

import time
import threading


class MyTask(threading.Thread):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(1)
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

多进程

在 Python 中,可使用 multiprocessing 库来实现多进程编程,和多线程同样,有两种方法可使用多进程编程。

直接建立进程:

import time
import random
import multiprocessing


def do_something(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(multiprocessing.Process(
            target=do_something,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

继承进程父类:

import time
import random
import multiprocessing


class MyTask(multiprocessing.Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

multiprocessing 除了经常使用的多进程编程外,我认为它最大的意义在于提供了一套规范,在该库下有一个 dummy 模块,即 multiprocessing.dummy,里面对 threading 进行封装,提供了和 multiprocessing 相同 API 的线程实现,换句话说,class::multiprocessing.Process 提供的是进程任务类,而 class::multiprocessing.dummy.Process,也正是有 multiprocessing.dummy 的存在,能够快速的讲一个多进程程序改成多线程:

import time
import random
from multiprocessing.dummy import Process


class MyTask(Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

不管是多线程仍是多进程编程,这也是我通常会选择 multiprocessing 的缘由。

除了直接建立进程,还能够用进程池(或者 multiprocessing.dummy 里的进程池):

import time
import random
from multiprocessing import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     建立 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

线程池:

import time
import random
from multiprocessing.dummy import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     建立 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

这里示例有个问题,pool 在 join 前须要 close 掉,不然就会抛出异常,不过 Python 之禅的做者 Tim Peters 给出解释:

As to Pool.close(), you should call that when - and only when - you're never going to submit more work to the Pool instance. So Pool.close() is typically called when the parallelizable part of your main program is finished. Then the worker processes will terminate when all work already assigned has completed.

It's also excellent practice to call Pool.join() to wait for the worker processes to terminate. Among other reasons, there's often no good way to report exceptions in parallelized code (exceptions occur in a context only vaguely related to what your main program is doing), and Pool.join() provides a synchronization point that can report some exceptions that occurred in worker processes that you'd otherwise never see.

同步原语

在多进程编程中,由于进程间的资源隔离,不须要考虑内存的线程安全问题,而在多线程编程中便须要同步原语来保存线程安全,由于 Python 是一门简单的语言,不少操做都是封装的操做系统 API,所以支持的同步原语蛮全,但这里只写两种常见的同步原语:锁和信号量。

经过使用锁能够用来保护一段内存空间,而信号量能够被多个线程共享。

threading 中能够看到 Lock 锁和 RLock 重用锁两种锁,区别如名。这两种锁都只能被一个线程拥有,第一种锁只能被得到一次,而重用锁能够被屡次得到,但也须要一样次数的释放才能真正的释放。

当多个线程对同一块内存空间同时进行修改的时候,常常遇到奇怪的问题:

import time
import random
from threading import Thread, Lock

count = 0


def do_task():
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

如上就是典型的非线程安全致使 count 没有达到预期的效果。而经过锁即可以控制某一段代码,或者说某段内存空间的访问:

import time
import random
from threading import Thread, Lock

count = 0
lock = Lock()


def do_task():
    lock.acquire()
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)
    lock.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

固然,上述例子很是暴力,直接强行把并发改成串行。

对于信号量常见于有限资源强占的场景,能够定义固定大小的信号量供多个线程获取或者释放,从而控制线程的任务执行,好比下面的例子,控制最多有 5 个任务在执行:

import time
import random
from threading import Thread, BoundedSemaphore

sep = BoundedSemaphore(5)


def do_task(task_name):
    sep.acquire()
    print("do Task: {}".format(task_name))
    time.sleep(random.randint(1, 10))
    sep.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task, args=("task_{}".format(i),)))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish.")

Queue 和 Pipe

由于多进程的内存隔离,不会存在内存竞争的问题。但同时,多个进程间的数据共享成为了新的问题,而进程间通讯常见:队列,管道,信号。

这里只讲解队列和管道。

队列常见于双进程模型,通常用做生产者-消费者模式,由生产者进程向队列中发布任务,并由消费者从队列首部拿出任务进行执行:

import time
from multiprocessing import Process, Queue


class Task1(Process):
    def __init__(self, queue):
        super(Task1, self).__init__()
        self.queue = queue

    def run(self):
        item = self.queue.get()
        print("get item: [{}]".format(item))


class Task2(Process):
    def __init__(self, queue):
        super(Task2, self).__init__()
        self.queue = queue

    def run(self):
        print("put item: [Hello]")
        time.sleep(1)
        self.queue.put("Hello")


if __name__ == "__main__":
    queue = Queue()
    t1 = Task1(queue)
    t2 = Task2(queue)
    t1.start()
    t2.start()
    t1.join()
    print("Finish.")

理论上每一个进程均可以向队列里的读或者写,能够认为队列是半双工路线。可是每每只有特定的读进程(好比消费者)和写进程(好比生产者),尽管这些进程只是开发者本身定义的。

而 Pipe 更像一个全工路线:

import time
from multiprocessing import Process, Pipe


class Task1(Process):
    def __init__(self, pipe):
        super(Task1, self).__init__()
        self.pipe = pipe

    def run(self):
        item = self.pipe.recv()
        print("Task1: recv item: [{}]".format(item))
        print("Task1: send item: [Hi]")
        self.pipe.send("Hi")


class Task2(Process):
    def __init__(self, pipe):
        super(Task2, self).__init__()
        self.pipe = pipe

    def run(self):
        print("Task2: send item: [Hello]")
        time.sleep(1)
        self.pipe.send("Hello")
        time.sleep(1)
        item = self.pipe.recv()
        print("Task2: recv item: [{}]".format(item))


if __name__ == "__main__":
    pipe = Pipe()
    t1 = Task1(pipe[0])
    t2 = Task2(pipe[1])
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Finish.")

除了上面介绍的 threadingmultiprocessing 两个库外,还有一个好用的使人发指的库 concurrent.futures。和前面两个库不一样,这个库是更高等级的抽象,隐藏了不少底层的东西,但也所以很是好用。用官方的例子:

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

该库中自带了进程池和线程池,能够经过上下文管理器来管理,并且对于异步任务执行完后,结果的得到也很是简单。再拿一个官方的多进程计算的例子做为结束:

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

欢迎关注我的公众号:CS实验室

相关文章
相关标签/搜索