Python 多线程教程:并发与并行

在批评Python的讨论中,经常提及Python多线程是多么的难用。还有人对 global interpreter lock(也被亲切的称为“GIL”)指指点点,说它阻碍了Python的多线程程序同时运行。所以,若是你是从其余语言(好比C++或Java)转过来的话,Python线程模块并不会像你想象的那样去运行。必需要说明的是,咱们仍是能够用Python写出能并发或并行的代码,而且能带来性能的显著提高,只要你能顾及到一些事情。若是你还没看过的话,我建议你看看Eqbal Quran的文章《Ruby中的并发和并行》。html

在本文中,咱们将会写一个小的Python脚本,用于下载Imgur上最热门的图片。咱们将会从一个按顺序下载图片的版本开始作起,即一个一个地下载。在那以前,你得注册一个Imgur上的应用。若是你尚未Imgur帐户,请先注册一个。python

本文中的脚本在Python3.4.2中测试经过。稍微改一下,应该也能在Python2中运行——urllib是两个版本中区别最大的部分。git

一、开始动手

让咱们从建立一个叫“download.py”的Python模块开始。这个文件包含了获取图片列表以及下载这些图片所需的全部函数。咱们将这些功能分红三个单独的函数:github

  • get_linksredis

  • download_link编程

  • setup_download_dirjson

第三个函数,“setup_download_dir”,用于建立下载的目标目录(若是不存在的话)。segmentfault

Imgur的API要求HTTP请求能支持带有client ID的“Authorization”头部。你能够从你注册的Imgur应用的面板上找到这个client ID,而响应会以JSON进行编码。咱们可使用Python的标准JSON库去解码。下载图片更简单,你只须要根据它们的URL获取图片,而后写入到一个文件便可。api

代码以下:安全

import json
import logging
import os
from pathlib import Path
from urllib.request import urlopen, Request
 
logger = logging.getLogger(__name__)
 
def get_links(client_id):
   headers = {'Authorization': 'Client-ID {}'.format(client_id)}
   req = Request('https://api.imgur.com/3/gallery/', headers=headers, method='GET')
   with urlopen(req) as resp:
       data = json.loads(resp.readall().decode('utf-8'))
   return map(lambda item: item['link'], data['data'])
 
def download_link(directory, link):
   logger.info('Downloading %s', link)
   download_path = directory / os.path.basename(link)
   with urlopen(link) as image, download_path.open('wb') as f:
       f.write(image.readall())
 
def setup_download_dir():
   download_dir = Path('images')
   if not download_dir.exists():
       download_dir.mkdir()
   return download_dir

接下来,你须要写一个模块,利用这些函数去逐个下载图片。咱们给它命名为“single.py”。它包含了咱们最原始版本的Imgur图片下载器的主要函数。这个模块将会经过环境变量“IMGUR_CLIENT_ID”去获取Imgur的client ID。它将会调用“setup_download_dir”去建立下载目录。最后,使用get_links函数去获取图片的列表,过滤掉全部的GIF和专辑URL,而后用“download_link”去将图片下载并保存在磁盘中。下面是“single.py”的代码:

import logging
import os
from time import time
 
from download import setup_download_dir, get_links, download_link
 
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)
 
def main():
   ts = time()
   client_id = os.getenv('IMGUR_CLIENT_ID')
   if not client_id:
       raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
   download_dir = setup_download_dir()
   links = [l for l in get_links(client_id) if l.endswith('.jpg')]
   for link in links:
       download_link(download_dir, link)
   print('Took {}s'.format(time() - ts))
 
if __name__ == '__main__':
   main()

注:为了测试方便,上面两段代码能够用以下代码替代演示:

# coding=utf-8
#测试utf-8编码
from time import sleep, time
import sys, threading

reload(sys)
sys.setdefaultencoding('utf-8')


def getNums(N):
    return xrange(N)


def processNum(num):
    num_add = num + 1
    sleep(1)
    print str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add)


if __name__ == "__main__":
    t1 = time()
    for i in getNums(3):
        processNum(i)

    print "cost time is: {:.2f}s".format(time() - t1)

结果:

<_MainThread(MainThread, started 4436)>: 0 → 1
<_MainThread(MainThread, started 4436)>: 1 → 2
<_MainThread(MainThread, started 4436)>: 2 → 3
cost time is: 3.00s

在个人笔记本上,这个脚本花了19.4秒去下载91张图片。请注意这些数字在不一样的网络上也会有所不一样。19.4秒并非很是的长,可是若是咱们要下载更多的图片怎么办呢?或许是900张而不是90张。平均下载一张图片要0.2秒,900张的话大概须要3分钟。那么9000张图片将会花掉30分钟。好消息是使用了并发或者并行后,咱们能够将这个速度显著地提升。

接下来的代码示例将只会显示导入特有模块和新模块的import语句。全部相关的Python脚本均可以在这方便地找到this GitHub repository

二、使用线程

线程是最出名的实现并发和并行的方式之一。操做系统通常提供了线程的特性。线程比进程要小,并且共享同一块内存空间。

在这里,咱们将写一个替代“single.py”的新模块。它将建立一个有八个线程的池,加上主线程的话总共就是九个线程。之因此是八个线程,是由于个人电脑有8个CPU内核,而一个工做线程对应一个内核看起来还不错。在实践中,线程的数量是仔细考究的,须要考虑到其余的因素,好比在同一台机器上跑的的其余应用和服务。

下面的脚本几乎跟以前的同样,除了咱们如今有个新的类,DownloadWorker,一个Thread类的子类。运行无限循环的run方法已经被重写。在每次迭代时,它调用“self.queue.get()”试图从一个线程安全的队列里获取一个URL。它将会一直堵塞,直到队列中出现一个要处理元素。一旦工做线程从队列中获得一个元素,它将会调用以前脚本中用来下载图片到目录中所用到的“download_link”方法。下载完成以后,工做线程向队列发送任务完成的信号。这很是重要,由于队列一直在跟踪队列中的任务数。若是工做线程没有发出任务完成的信号,“queue.join()”的调用将会令整个主线程都在阻塞状态。

from queue import Queue
from threading import Thread
 
class DownloadWorker(Thread):
   def __init__(self, queue):
       Thread.__init__(self)
       self.queue = queue
 
   def run(self):
       while True:
           # Get the work from the queue and expand the tuple
           # 从队列中获取任务并扩展tuple
           directory, link = self.queue.get()
           download_link(directory, link)
           self.queue.task_done()
 
def main():
   ts = time()
   client_id = os.getenv('IMGUR_CLIENT_ID')
   if not client_id:
       raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
   download_dir = setup_download_dir()
   links = [l for l in get_links(client_id) if l.endswith('.jpg')]
   # Create a queue to communicate with the worker threads
   queue = Queue()
   # Create 8 worker threads
   # 建立八个工做线程
   for x in range(8):
       worker = DownloadWorker(queue)
       # Setting daemon to True will let the main thread exit even though the workers are blocking
       # 将daemon设置为True将会使主线程退出,即便worker都阻塞了
       worker.daemon = True
       worker.start()
   # Put the tasks into the queue as a tuple
   # 将任务以tuple的形式放入队列中
   for link in links:
       logger.info('Queueing {}'.format(link))
       queue.put((download_dir, link))
   # Causes the main thread to wait for the queue to finish processing all the tasks
   # 让主线程等待队列完成全部的任务
   queue.join()
   print('Took {}'.format(time() - ts))

注:为了测试方便,上面的代码能够用以下代码替代演示:

# coding=utf-8
#测试utf-8编码
from Queue import Queue
from threading import Thread
from single import *
import sys

reload(sys)
sys.setdefaultencoding('utf-8')


class ProcessWorker(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            # Get the work from the queue
            num = self.queue.get()
            processNum(num)
            self.queue.task_done()


def main():
    ts = time()
    nums = getNums(4)
    # Create a queue to communicate with the worker threads
    queue = Queue()
    # Create 4 worker threads
    # 建立四个工做线程
    for x in range(4):
        worker = ProcessWorker(queue)
        # Setting daemon to True will let the main thread exit even though the workers are blocking
        # 将daemon设置为True将会使主线程退出,即便worker都阻塞了
        worker.daemon = True
        worker.start()
    # Put the tasks into the queue
    for num in nums:
        queue.put(num)
    # Causes the main thread to wait for the queue to finish processing all the tasks
    # 让主线程等待队列完成全部的任务
    queue.join()
    print("cost time is: {:.2f}s".format(time() - ts))


if __name__ == "__main__":
    main()

结果:

<ProcessWorker(Thread-4, started daemon 3900)>: 3 → 4<ProcessWorker(Thread-1, started daemon 3436)>: 2 → 3<ProcessWorker(Thread-3, started daemon 4576)>: 1 → 2

 
<ProcessWorker(Thread-2, started daemon 396)>: 0 → 1
cost time is: 1.01s

在同一个机器上运行这个脚本,下载时间变成了4.1秒!即比以前的例子快4.7倍。虽然这快了不少,但仍是要提一下,因为GIL的缘故,在这个进程中同一时间只有一个线程在运行。所以,这段代码是并发的但不是并行的。而它仍然变快的缘由是这是一个IO密集型的任务。进程下载图片时根本绝不费力,而主要的时间都花在了等待网络上。这就是为何线程能够提供很大的速度提高。每当线程中的一个准备工做时,进程能够不断转换线程。使用Python或其余有GIL的解释型语言中的线程模块实际上会下降性能。若是你的代码执行的是CPU密集型的任务,例如解压gzip文件,使用线程模块将会致使执行时间变长。对于CPU密集型任务和真正的并行执行,咱们可使用多进程(multiprocessing)模块。

官方的Python实现——CPython——带有GIL,但不是全部的Python实现都是这样的。好比,IronPython,使用.NET框架实现的Python就没有GIL,基于Java实现的Jython也一样没有。你能够点这查看现有的Python实现。

三、生成多进程

多进程模块比线程模块更易使用,由于咱们不须要像线程示例那样新增一个类。咱们惟一须要作的改变在主函数中。

为了使用多进程,咱们得创建一个多进程池。经过它提供的map方法,咱们把URL列表传给池,而后8个新进程就会生成,它们将并行地去下载图片。这就是真正的并行,不过这是有代价的。整个脚本的内存将会被拷贝到各个子进程中。在咱们的例子中这不算什么,可是在大型程序中它很容易致使严重的问题。

from functools import partial
from multiprocessing.pool import Pool
 
def main():
   ts = time()
   client_id = os.getenv('IMGUR_CLIENT_ID')
   if not client_id:
       raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
   download_dir = setup_download_dir()
   links = [l for l in get_links(client_id) if l.endswith('.jpg')]
   download = partial(download_link, download_dir)
   with Pool(8) as p:
       p.map(download, links)
   print('Took {}s'.format(time() - ts))

注:为了测试方便,上面的代码能够用以下代码替代演示:

# coding=utf-8
#测试utf-8编码
from functools import partial
from multiprocessing.pool import Pool
from single import *
from time import time

import sys

reload(sys)
sys.setdefaultencoding('utf-8')


def main():
    ts = time()
    nums = getNums(4)
    p = Pool(4)
    p.map(processNum, nums)
    print("cost time is: {:.2f}s".format(time() - ts))


if __name__ == "__main__":
    main()

结果:

<_MainThread(MainThread, started 6188)>: 0 → 1
<_MainThread(MainThread, started 3584)>: 1 → 2
<_MainThread(MainThread, started 2572)>: 3 → 4<_MainThread(MainThread, started 4692)>: 2 → 3

cost time is: 1.21s

四、分布式任务

你已经知道了线程和多进程模块能够给你本身的电脑跑脚本时提供很大的帮助,那么在你想要在不一样的机器上执行任务,或者在你须要扩大规模而超过一台机器的的能力范围时,你该怎么办呢?一个很好的使用案例是网络应用的长时间后台任务。若是你有一些很耗时的任务,你不会但愿在同一台机器上占用一些其余的应用代码所须要的子进程或线程。这将会使你的应用的性能降低,影响到你的用户们。若是能在另一台甚至不少台其余的机器上跑这些任务就行了。

Python库RQ很是适用于这类任务。它是一个简单却很强大的库。首先将一个函数和它的参数放入队列中。它将函数调用的表示序列化(pickle),而后将这些表示添加到一个Redis列表中。任务进入队列只是第一步,什么都尚未作。咱们至少还须要一个能去监放任务队列的worker(工做线程)。

第一步是在你的电脑上安装和使用Redis服务器,或是拥有一台能正常的使用的Redis服务器的使用权。接着,对于现有的代码只须要一些小小的改动。先建立一个RQ队列的实例并经过redis-py 库传给一台Redis服务器。而后,咱们执行“q.enqueue(download_link, download_dir, link)”,而不仅是调用“download_link” 。enqueue方法的第一个参数是一个函数,当任务真正执行时,其余的参数或关键字参数将会传给该函数。

最后一步是启动一些worker。RQ提供了方便的脚本,能够在默认队列上运行起worker。只要在终端窗口中执行“rqworker”,就能够开始监听默认队列了。请确认你当前的工做目录与脚本所在的是同一个。若是你想监听别的队列,你能够执行“rqworker queue_name”,而后将会开始执行名为queue_name的队列。RQ的一个很好的点就是,只要你能够链接到Redis,你就能够在任意数量上的机器上跑起任意数量的worker;所以,它可让你的应用扩展性获得提高。下面是RQ版本的代码:

from redis import Redis
from rq import Queue
 
def main():
   client_id = os.getenv('IMGUR_CLIENT_ID')
   if not client_id:
       raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
   download_dir = setup_download_dir()
   links = [l for l in get_links(client_id) if l.endswith('.jpg')]
   q = Queue(connection=Redis(host='localhost', port=6379))
   for link in links:
       q.enqueue(download_link, download_dir, link)

然而RQ并非Python任务队列的惟一解决方案。RQ确实易用而且能在简单的案例中起到很大的做用,可是若是有更高级的需求,咱们可使用其余的解决方案(例如 Celery)。

五、总结

若是你的代码是IO密集型的,线程和多进程能够帮到你。多进程比线程更易用,可是消耗更多的内存。若是你的代码是CPU密集型的,多进程就明显是更好的选择——特别是所使用的机器是多核或多CPU的。对于网络应用,在你须要扩展到多台机器上执行任务,RQ是更好的选择。

六、注:关于并发、并行区别与联系

  • 并发是指,程序在运行的过程当中存在多于一个的执行上下文。这些执行上下文通常对应着不一样的调用栈。

在单处理器上,并发程序虽然有多个上下文运行环境,但某一个时刻只有一个任务在运行。

但在多处理器上,由于有了多个执行单元,就能够同时有数个任务在跑。

  • 这种物理上同一时刻有多个任务同时运行的方式就是并行。

和并发相比,并行更增强调多个任务同时在运行。

并且并行还有一个层次问题,好比是指令间的并行仍是任务间的并行。

七、Refer:

[1] Python Multithreading Tutorial: Concurrency and Parallelism

http://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python

[2] 串行(Sequential)、并发(Concurrent)、并行(parallel)与分布式(distributed)

http://www.lingcc.com/2011/12/28/11918/

[3] 说说这篇「我为何从 Python 转向 Go」

http://t.cn/R2L0lyu

[4] Python 中的进程、线程、协程、同步、异步、回调

http://python.jobbole.com/81692/

[5] 异步等待的 Python 协程

http://segmentfault.com/a/1190000003076472

[6] Python多进程编程

http://python.jobbole.com/82045/

[7] Python线程指南

http://python.jobbole.com/82105/

[8] 使用Python进行并发编程

http://bit.ly/29hd8dq

相关文章
相关标签/搜索