Python3快速入门(九)——Python3并发编程

Python3快速入门(九)——Python3并发编程

1、Python线程模块

一、线程简介

一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,与进程内的其它线程共享进程的全部资源。一个进程中至少有一个线程,并做为程序的入口,即主线程,其它线程称为工做线程。
     多线程,是指从软件或者硬件上实现多个线程并发执行的技术。支持多线程能力的计算机因有硬件支持而可以在同一时间执行多个线程,进而提高总体处理性能。数据库

二、线程状态

线程有就绪、阻塞、运行三种基本状态。就绪状态是指线程具有运行的全部条件,在等待CPU执行; 运行状态是指线程占有CPU正在运行; 阻塞状态是指线程在等待一个事件,逻辑上不可执行。
三种状态的相互转化以下图所示:
Python3快速入门(九)——Python3并发编程编程

二、threading线程模块

Python3 经过_thread 和 threading两个模块提供对线程的支持。
_thread提供了低级别的、原始的线程以及简单锁,相比于 threading 模块的功能比较有限,是对已经废弃的thread模块的兼容性支持方案。
threading 模块除了包含_thread模块中的全部方法外,还提供的以下方法:
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行线程的数量,与len(threading.enumerate())有相同的结果。
Thread类提供方法以下:
run(): 用以表示线程活动的方法。
start():启动线程活动。
join([time]): 等待至线程停止。阻塞调用线程直至线程的join() 方法被调用停止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
isAlive(): 返回线程是否活动的。
getName(): 返回线程名。
setName(): 设置线程名。安全

三、multiprocessing模块

multiprocessing模块是跨平台版本的多进程模块,提供了一个Process类表明一个进程对象。建立子进程时,只须要传入一个执行函数和函数的参数,建立一个Process实例。
Process(self,group=None,target=None,name=None,args=(),kwargs=())
group参数未使用,值始终为None。
target表示调用的对象,子进程要执行的任务。
name能够为子进程命名。
args指定传结target函数的位置参数,是一个元组形式,必须有逗号,如:args=(‘monicx’,)
kwargs指定传结target函数的关键字参数,是一个字典,如kwargs={‘name’:‘monicx’,‘age’:18}
Process方法以下:
start():启动进程,并调用子进程的run()方法。
run():进程启动进运行的方法,在run内调用target指定的函数,子进程类中必定要实现run方法。
terminate():强制终止进程,不会进行任何清理操做,若是进程建立了子进程,子进程会变成僵尸进程;若是进程还保存了一个锁,则不会释放进程锁,进而致使死锁。
is_alive():判断进程是不是“活着”的状态。
join(timeout):让主进程程等待某一子进程结束,才继续执行主进程。timeout是可选的超时时间,超过一个时间主进程就不等待。多线程

四、全局解释锁GIL

Python并不支持真正意义上的多线程。Python中提供了多线程模块,但若是想经过多线程提升代码的速度,并不推荐使用多线程模块。Python中有一个全局锁Global Interpreter Lock(GIL),全局锁会确保任什么时候候多个线程中只有一个会被执行。线程的执行速度很是快,会误觉得线程是并行执行的,但实际上都是轮流执行。通过GIL处理后,会增长线程执行的开销。
全局锁 GIL(Global interpreter lock) 并非 Python 的特性,而是在实现 Python 解析器(CPython)时所引入的一个概念。Python有CPython,PyPy,Psyco 等不一样的 Python 执行环境,其中 JPython 没有GIL。CPython 是大部分环境下默认的 Python 执行环境,GIL 并非 Python 的特性,Python 彻底能够不依赖于 GIL。
GIL 限制了同一时刻只能有一个线程运行,没法发挥多核 CPU 的优点。GIL 本质是互斥锁,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。在一个 Python 的进程内,不只有主线程或者由主线程开启的其它线程,还有解释器开启的垃圾回收等解释器级别的线程。进程内,全部数据都是共享的,代码做为一种数据也会被全部线程共享,多个线程先访问到解释器的代码,即拿到执行权限,而后将 target 的代码交给解释器的代码去执行,解释器的代码是全部线程共享的,因此垃圾回收线程也可能访问到解释器的代码而去执行,所以为了保证数据安全须要加锁处理,即 GIL。
因为GIL 的存在,同一时刻同一进程中只有一个线程被执行。多核 CPU能够并行完成计算,所以多核能够提高计算性能,但 CPU 一旦遇到 I/O 阻塞,仍然须要等待,因此多核CPU对 I/O 密集型任务提高不明显。根据执行任务是计算密集型仍是I/O 密集型,不一样场景使用不一样的方法,对于计算密集型任务,多进程占优点,对于 I/O 密集型任务,多线程占优点。
计算密集型任务-多进程方案:并发

# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time

def work():
    result = 0
    for x in range(100000000):
        result *= x

if __name__ == "__main__":
    processes = []
    print("CPU: ", os.cpu_count())
    start = time.time()
    for i in range(4):
        p = Process(target=work)
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    end = time.time()
    print("计算密集型任务,多进程耗时 %s" % (end - start))

# output:
# CPU:  4
# 计算密集型任务,多进程耗时 9.485123872756958

计算密集型任务-多线程方案:app

# -*- coding:utf-8 -*-
from threading import Thread
import os, time

def work():
    res = 0
    for x in range(100000000):
        res *= x

if __name__ == "__main__":
    threads = []
    print("CPU: ",os.cpu_count())
    start = time.time()
    for i in range(4):
        thread = Thread(target=work)  # 多进程
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()
    end = time.time()
    print("计算密集型任务,多线程耗时 %s" % (end - start))

# output:
# CPU:  4
# 计算密集型任务,多线程耗时 18.434288501739502

IO密集型任务-多进程方案:dom

# -*- coding:utf-8 -*-
from multiprocessing import Process
import os, time

def work():
    time.sleep(2)
    print("hello,Python----------------------------------------------------", file=open("tmp.txt", "w"))

if __name__ == "__main__":
    processes = []
    print("CPU: ", os.cpu_count())
    start = time.time()
    for i in range(400):
        p = Process(target=work)  # 多进程
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    stop = time.time()
    print("I/0密集型任务,多进程耗时 %s" % (stop - start))

# output:
# CPU:  4
# I/0密集型任务,多进程耗时 2.8894519805908203

IO密集型任务-多线程方案:异步

# -*- coding:utf-8 -*-
from threading import Thread
import os, time

def work():
    time.sleep(2)
    print("hello,Python----------------------------------------------------", file=open("tmp.txt", "w"))

if __name__ == "__main__":
    threads = []
    print("CPU: ", os.cpu_count())
    start = time.time()

    for x in range(400):
        thread = Thread(target=work)
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()
    end = time.time()
    print("IO密集型任务,多线程耗时 %s" % (end - start))

# output:
# CPU:  4
# IO密集型任务,多线程耗时 2.044438362121582

2、建立线程

一、threading.Thread实例化

threading.Thread构造函数以下:ide

def __init__(self, group=None, target=None, name=None,
             args=(), kwargs=None, *, daemon=None):

建立 threading.Thread 实例,调用其 start() 方法。函数

# -*- coding:utf-8 -*-
import time
import threading

def work_task(counter):
    print("%s %s" % (threading.current_thread().name, time.ctime(time.time())))
    n = counter;
    while n > 0:
        time.sleep(1)
        n -= 1

if __name__ == "__main__":
    print("main thread start:", time.strftime("%Y-%m-%d %H:%M:%S"))

    threads = []
    for x in range(10):
        thread = threading.Thread(target=work_task, args=(x, ))
        threads.append(thread)

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    print("main thread end:", time.strftime("%Y-%m-%d %H:%M:%S"))

# output:
# main thread start: 2019-07-03 21:49:58
# Thread-1 Wed Jul  3 21:49:58 2019
# Thread-2 Wed Jul  3 21:49:58 2019
# Thread-3 Wed Jul  3 21:49:58 2019
# Thread-4 Wed Jul  3 21:49:58 2019
# Thread-5 Wed Jul  3 21:49:58 2019
# Thread-6 Wed Jul  3 21:49:58 2019
# Thread-7 Wed Jul  3 21:49:58 2019
# Thread-8 Wed Jul  3 21:49:58 2019
# Thread-9 Wed Jul  3 21:49:58 2019
# Thread-10 Wed Jul  3 21:49:58 2019
# main thread end: 2019-07-03 21:50:07

二、threading.Thread子线程

能够经过直接从 threading.Thread类继承建立一个新的子类,在子类中重写 run() 和 init() 方法,实例化后调用 start() 方法启动新线程,start函数内部会调用线程的 run() 方法。

# -*- coding:utf-8 -*-
import threading
import time

class WorkThread(threading.Thread):
    def __init__(self, thread_id, name):
        threading.Thread.__init__(self)
        self.thread_id = thread_id
        self.name = name

    def run(self):
        print("start thread: ", self.name)
        work(self.name, self.thread_id)
        print("end thread: ", self.name)

def work(thread_name, thread_id):
    print("%s %s %s" % (thread_name, thread_id, time.ctime(time.time())))
    i = 0;
    while i < 2:
        i += 1
        time.sleep(1)

if __name__ == '__main__':
    thread1 = WorkThread(1, "Thread1")
    thread2 = WorkThread(2, "Thread2")

    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

    print("exit main thread")

# output:
# start thread:  Thread1
# Thread1 1 Tue Jul  2 20:39:42 2019
# start thread:  Thread2
# Thread2 2 Tue Jul  2 20:39:42 2019
# end thread:  end thread: Thread1
#  Thread2
# exit main thread

若是须要从外部传入函数,能够将传入参数做为子线程实例属性,在run实例方法内进行调用。
self.target = target
self.args = args

# -*- coding:utf-8 -*-
import threading
import time

class WorkThread(threading.Thread):
    def __init__(self, target, args):
        threading.Thread.__init__(self)
        self.target = target
        self.args = args

    def run(self):
        print("start thread: ", self.name)
        self.target(*self.args)
        print("end thread: ", self.name)

def work_task(counter):
    time.sleep(1)
    print("%s %s" % (threading.currentThread().name, time.ctime(time.time())))
    i = counter;
    while i > 0:
        i -= 1

if __name__ == '__main__':
    print("main thread start:", time.strftime("%Y-%m-%d %H:%M:%S"))

    threads = []
    for x in range(10):
        thread = threading.Thread(target=work_task, args=(x,))
        threads.append(thread)

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    print("main thread end:", time.strftime("%Y-%m-%d %H:%M:%S"))

# output:
# main thread start: 2019-07-03 22:02:32
# Thread-1 Wed Jul  3 22:02:33 2019Thread-5 Wed Jul  3 22:02:33 2019
# Thread-2 Wed Jul  3 22:02:33 2019
# Thread-3 Wed Jul  3 22:02:33 2019
# Thread-4 Wed Jul  3 22:02:33 2019
#
# Thread-7 Wed Jul  3 22:02:33 2019Thread-6 Wed Jul  3 22:02:33 2019
# Thread-10 Wed Jul  3 22:02:33 2019
# Thread-8 Wed Jul  3 22:02:33 2019
#
# Thread-9 Wed Jul  3 22:02:33 2019
# main thread end: 2019-07-03 22:02:33

三、start与run

import threading
import time

def work_task(counter):
    n = counter
    while n > 0:
        n -= 1
        print("thread name: %s, id: %s" % (threading.currentThread().name, threading.currentThread().ident))

if __name__ == "__main__":
    print("main thread start")
    thread1 = threading.Thread(target=work_task, args=(5,))
    thread2 = threading.Thread(target=work_task, args=(5,))
    thread1.start()
    thread2.start()

    print("main thread end")

# output:
# main thread start
# thread name: Thread-1, id: 139926959064832thread name: Thread-2, id: 139926880384768main thread end
#
#
# thread name: Thread-1, id: 139926959064832
# thread name: Thread-2, id: 139926880384768thread name: Thread-1, id: 139926959064832
#
# thread name: Thread-1, id: 139926959064832
# thread name: Thread-2, id: 139926880384768thread name: Thread-1, id: 139926959064832
#
# thread name: Thread-2, id: 139926880384768
# thread name: Thread-2, id: 139926880384768

使用start()方法启动了两个新的子线程并交替运行,每一个子进程ID也不一样,启动的线程名是定义线程对象时设置的name="xxxx"值,若是没有设置name参数值,则会打印系统分配的Thread-x名称。

import threading
import time

def work_task(counter):
    n = counter
    while n > 0:
        n -= 1
        print("thread name: %s, id: %s" % (threading.currentThread().name, threading.currentThread().ident))

if __name__ == "__main__":
    print("main thread start")
    thread1 = threading.Thread(target=work_task, args=(5,))
    thread2 = threading.Thread(target=work_task, args=(5,))
    thread1.run()
    thread2.run()

    print("main thread end")

# output:
# main thread start
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# main thread end

用run()方法启动线程,打印的线程名是MainThread,即主线程。两个线程都用run()方法启动,但却先运行thread1.run(),运行完后才按顺序运行thread2.run(),两个线程都工做在主线程,没有启动新线程,所以,run()方法仅是普通函数调用。

四、join方法

当一个进程启动后,会默认产生一个主线程,由于线程是程序执行流的最小单元,当设置多线程时,主线程会建立多个子线程。在Python中,默认状况下主线程执行完本身的任务后,就会退出,此时子线程会继续执行本身的任务,直到本身的任务结束。

import threading
import time

def work_task(counter):
    n = counter
    while n > 0:
        n -= 1
        print("thread name: %s, id: %s" % (threading.currentThread().name, threading.currentThread().ident))

if __name__ == "__main__":
    print("main thread start")
    threads = []
    for x in range(5):
        thread = threading.Thread(target=work_task, args=(5,))
        threads.append(thread)
    for thread in threads:
        thread.start()

    print("main thread end")

# output:
# main thread start
# thread name: Thread-1, id: 140306042726144thread name: Thread-2, id: 140306034333440
# thread name: Thread-2, id: 140306034333440
# thread name: Thread-2, id: 140306034333440
# thread name: Thread-2, id: 140306034333440
# thread name: Thread-2, id: 140306034333440
# thread name: Thread-3, id: 140306025940736
#
# thread name: Thread-3, id: 140306025940736
# thread name: Thread-3, id: 140306025940736
# thread name: Thread-1, id: 140306042726144thread name: Thread-3, id: 140306025940736
# thread name: Thread-3, id: 140306025940736
#
# thread name: Thread-1, id: 140306042726144
# thread name: Thread-1, id: 140306042726144
# thread name: Thread-1, id: 140306042726144
# thread name: Thread-4, id: 140306034333440
# thread name: Thread-4, id: 140306034333440
# thread name: Thread-5, id: 140306042726144thread name: Thread-4, id: 140306034333440
# main thread endthread name: Thread-4, id: 140306034333440
# thread name: Thread-4, id: 140306034333440
#
# thread name: Thread-5, id: 140306042726144
#
# thread name: Thread-5, id: 140306042726144
# thread name: Thread-5, id: 140306042726144
# thread name: Thread-5, id: 140306042726144

当使用setDaemon(True)方法,设置子线程为守护线程时,主线程一旦执行结束,则所有线程所有被终止执行,可能会出现子线程的任务尚未彻底执行结束,就被迫中止。设置setDaemon必须在启动子线程前进行设置。

import threading
import time

def work_task(counter):
    n = counter
    time.sleep(1)
    while n > 0:
        n -= 1
        print("thread name: %s, id: %s" % (threading.currentThread().name, threading.currentThread().ident))

if __name__ == "__main__":
    print("main thread start")
    threads = []
    for x in range(5):
        thread = threading.Thread(target=work_task, args=(5,))
        threads.append(thread)
    for thread in threads:
        thread.setDaemon(True)
        thread.start()

    print("main thread end")

# output:
# main thread start
# main thread end

join方法用于让主线程等待子线执行完并返回结果后,再执行主线程剩下的内容,子线程不执行完,主线程就一直等待状态。

import threading
import time

def work_task(counter):
    n = counter
    time.sleep(1)
    while n > 0:
        n -= 1
        print("thread name: %s" % threading.currentThread().name)

if __name__ == "__main__":
    print("main thread start")
    threads = []
    for x in range(5):
        thread = threading.Thread(target=work_task, args=(5,))
        threads.append(thread)
    for thread in threads:
        thread.setDaemon(True)
        thread.start()
        thread.join()

    print("main thread end")

# output:
# main thread start
# thread name: Thread-1
# thread name: Thread-1
# thread name: Thread-1
# thread name: Thread-1
# thread name: Thread-1
# thread name: Thread-2
# thread name: Thread-2
# thread name: Thread-2
# thread name: Thread-2
# thread name: Thread-2
# thread name: Thread-3
# thread name: Thread-3
# thread name: Thread-3
# thread name: Thread-3
# thread name: Thread-3
# thread name: Thread-4
# thread name: Thread-4
# thread name: Thread-4
# thread name: Thread-4
# thread name: Thread-4
# thread name: Thread-5
# thread name: Thread-5
# thread name: Thread-5
# thread name: Thread-5
# thread name: Thread-5
# main thread end

join有一个timeout参数,当设置守护线程时,主线程对子线程等待timeout时间,给每一个子线程一个timeout时间,让子线程执行,时间一到,无论任务有没有完成,直接杀死。 若是有多个子线程,所有的等待时间是每一个子线程timeout的累加和。

import threading
import time

def work_task(counter):
    print("thread name: %s work task start" % threading.currentThread().name)
    n = counter
    time.sleep(4)
    while n > 0:
        n -= 1
    else:
        print("thread name: %s work task end" % threading.currentThread().name)

if __name__ == "__main__":
    print("main thread start")
    threads = []
    for x in range(5):
        thread = threading.Thread(target=work_task, args=(5,))
        threads.append(thread)

    for x in range(5):
        threads[x].setDaemon(True)
        threads[x].start()
        threads[x].join(1)

    print("main thread end")

# output:
# main thread start
# thread name: Thread-1 work task start
# thread name: Thread-2 work task start
# thread name: Thread-3 work task start
# thread name: Thread-4 work task start
# thread name: Thread-5 work task start
# thread name: Thread-1 work task end
# main thread end

没有设置守护线程时,主线程将会等待timeout的累加和的一段时间,时间一到,主线程结束,但并无杀死子线程,子线程依然能够继续执行,直到子线程所有结束,程序退出。

import threading
import time

def work_task(counter):
    print("thread name: %s work task start" % threading.currentThread().name)
    n = counter
    time.sleep(4)
    while n > 0:
        n -= 1
    else:
        print("thread name: %s work task end" % threading.currentThread().name)

if __name__ == "__main__":
    print("main thread start")
    threads = []
    for x in range(5):
        thread = threading.Thread(target=work_task, args=(5,))
        threads.append(thread)

    for x in range(5):
        threads[x].start()
        threads[x].join(1)

    print("main thread end")

# output:
# main thread start
# thread name: Thread-1 work task start
# thread name: Thread-2 work task start
# thread name: Thread-3 work task start
# thread name: Thread-4 work task start
# thread name: Thread-5 work task start
# thread name: Thread-1 work task end
# main thread end
# thread name: Thread-2 work task end
# thread name: Thread-3 work task end
# thread name: Thread-4 work task end
# thread name: Thread-5 work task end

3、线程同步

若是多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,须要对多个线程进行同步。

一、互斥锁

threading.Thread 类的 Lock 锁和 Rlock 锁能够实现简单线程同步,Lock 锁和 Rlock 锁都有 acquire 方法和 release 方法,每次只容许一个线程操做的数据须要将其操做放到 acquire 和 release 方法之间。

# -*- coding:utf-8 -*-
import threading
import time

class WorkThread(threading.Thread):
    def __init__(self, thread_id, name):
        threading.Thread.__init__(self)
        self.thread_id = thread_id
        self.name = name

    def run(self):
        thread_locker.acquire()
        print("start thread: ", self.name)
        work(self.name, self.thread_id)
        print("end thread: ", self.name)
        thread_locker.release()

def work(thread_name, thread_id):
    print("%s %s %s" % (thread_name, thread_id, time.ctime(time.time())))
    i = 0;
    while i < 2:
        i += 1
        time.sleep(1)

thread_locker = threading.Lock()
threads = []

if __name__ == '__main__':
    thread1 = WorkThread(1, "Thread1")
    thread2 = WorkThread(2, "Thread2")

    thread1.start()
    thread2.start()

    threads.append(thread1)
    threads.append(thread2)

    for t in threads:
        t.join()

    print("exit main thread")

# output:
# start thread:  Thread1
# Thread1 1 Tue Jul  2 20:48:05 2019
# end thread:  Thread1
# start thread:  Thread2
# Thread2 2 Tue Jul  2 20:48:07 2019
# end thread:  Thread2
# exit main thread

二、信号量

互斥锁同时只容许一个线程访问共享数据,而信号量同时容许必定数量的线程访问共享数据,如银行柜台有 5 个窗口,则容许同时有 5 我的办理业务,后面的人只能等待前面有人办完业务后才能够进入柜台办理。

# -*- coding:utf-8 -*-
import threading
import time

semaphore = threading.BoundedSemaphore(5)
threads = []

def do_work(name):
    semaphore.acquire()
    time.sleep(2)
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} {threading.currentThread().name} is carrying on business")
    semaphore.release()

if __name__ == '__main__':
    print("main thread start:", time.strftime("%Y-%m-%d %H:%M:%S"))

    for i in range(10):
        t = threading.Thread(target=do_work, args=(i,))
        threads.append(t)

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    print("main thread end:", time.strftime("%Y-%m-%d %H:%M:%S"))

# output:
# main thread start: 2019-07-03 22:31:06
# 2019-07-03 22:31:08 Thread-1 is carrying on business
# 2019-07-03 22:31:08 Thread-3 is carrying on business
# 2019-07-03 22:31:08 Thread-2 is carrying on business
# 2019-07-03 22:31:08 Thread-4 is carrying on business
# 2019-07-03 22:31:08 Thread-5 is carrying on business
# 2019-07-03 22:31:10 Thread-6 is carrying on business
# 2019-07-03 22:31:10 Thread-7 is carrying on business
# 2019-07-03 22:31:10 Thread-9 is carrying on business
# 2019-07-03 22:31:10 Thread-8 is carrying on business
# 2019-07-03 22:31:10 Thread-10 is carrying on business
# main thread end: 2019-07-03 22:31:10

三、条件变量

条件变量能让一个线程 A 停下来,等待其它线程 B ,线程 B 知足了某个条件后通知(notify)线程 A 继续运行。线程首先获取一个条件变量锁,若是条件不知足,则线程等待(wait)并释放条件变量锁;若是条件知足则执行线程,也能够通知其它状态为 wait 的线程。其它处于 wait 状态的线程接到通知后会从新判断条件。

import threading
import time

class ThreadA(threading.Thread):
    def __init__(self, con, name):
        super(ThreadA, self).__init__()
        self.cond = con
        self.name = name

    def run(self):
        self.cond.acquire()
        print(self.name + ": What can I do for you?")
        self.cond.notify()
        self.cond.wait()
        print(self.name + ": Five yuan.")
        self.cond.notify()
        self.cond.wait()
        print(self.name + ": You are welcome.")
        self.cond.release()

class ThreadB(threading.Thread):
    def __init__(self, con, name):
        super(ThreadB, self).__init__()
        self.cond = con
        self.name = name

    def run(self):
        self.cond.acquire()
        time.sleep(1)
        print(self.name + ": A hot dog, please!")
        self.cond.notify()
        self.cond.wait()
        print(self.name + ": Thanks.")
        self.cond.notify()
        self.cond.release()

if __name__ == "__main__":
    cond = threading.Condition()
    thread1 = ThreadA(cond, "ThreadA")
    thread2 = ThreadB(cond, "ThreadB")
    thread1.start()
    thread2.start()

# output:
# ThreadA: What can I do for you?
# ThreadB: A hot dog, please!
# ThreadA: Five yuan.
# ThreadB: Thanks.
# ThreadA: You are welcome.

四、事件

事件用于线程间通讯。一个线程发出一个信号,其它一个或多个线程等待,调用 event 对象的 wait 方法,线程则会阻塞等待,直到其它线程 set 后,才会被唤醒。

import threading
import time

class ThreadA(threading.Thread):
    def __init__(self, _event, name):
        super(ThreadA, self).__init__()
        self.event = _event
        self.name = name

    def run(self):
        print(self.name + ": What can I do for you?")
        self.event.set()
        time.sleep(0.5)
        self.event.wait()
        print(self.name + ": Five yuan.")
        self.event.set()
        time.sleep(0.5)
        self.event.wait()
        self.event.clear()
        print(self.name + ": You are welcome!")

class ThreadB(threading.Thread):
    def __init__(self, _event, name):
        super(ThreadB, self).__init__()
        self.event = _event
        self.name = name

    def run(self):
        self.event.wait()
        self.event.clear()
        print(self.name + ": A hot dog, please!")
        self.event.set()
        time.sleep(0.5)
        self.event.wait()
        print(self.name + ": Thanks!")
        self.event.set()

if __name__ == "__main__":
    event = threading.Event()
    thread1 = ThreadA(event, "ThreadA")
    thread2 = ThreadB(event, "ThreadB")
    thread1.start()
    thread2.start()

# output:
# ThreadA: What can I do for you?
# ThreadB: A hot dog, please!
# ThreadA: Five yuan.
# ThreadB: Thanks!
# ThreadA: You are welcome!

五、线程优先级队列

Python 的 Queue 模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,优先级队列 PriorityQueue。
Queue,LifoQueue,PriorityQueue都实现了锁原语,可以在多线程中直接使用,可使用队列来实现线程间的同步。
Queue 模块中的经常使用方法:
Queue.qsize() 返回队列的大小
Queue.empty() 若是队列为空,返回True,不然返回False
Queue.full() 若是队列满,返回True,不然返回False
Queue.get([block[, timeout]])获取队列,timeout等待时间
Queue.get_nowait() 至关Queue.get(False)
Queue.put(item) 写入队列,timeout等待时间
Queue.put_nowait(item) 至关Queue.put(item, False)
Queue.task_done() 在完成一项工做后,Queue.task_done()函数向任务已经完成的队列发送一个信号
Queue.join() 阻塞直到队列为空,再执行别的操做

# -*- coding:utf-8 -*-
import threading
import time
import queue

exitFlag = 0

class WorkThread(threading.Thread):
    def __init__(self, id, name, q):
        threading.Thread.__init__(self)
        self.thread_id = id
        self.name = name
        self.queue = q

    def run(self):
        work(self.name, self.queue)

def work(thread_name, q):
    while not exitFlag:
        thread_locker.acquire()
        if not work_queue.empty():
            data = q.get()
            print("%s processing %s" % (thread_name, data))
            thread_locker.release()
        else:
            thread_locker.release()
        time.sleep(1)

thread_locker = threading.Lock()
thread_list = ["Thread1", "Thread2", "Thread3"]
work_queue = queue.Queue(10)
messages = ["one", "two", "three", "four", "five"]
threads = []
thread_id = 1

if __name__ == '__main__':
    # 建立新线程
    for name in thread_list:
        thread = WorkThread(thread_id, name, work_queue)
        thread.start()
        threads.append(thread)
        thread_id += 1

    # 填充队列
    thread_locker.acquire()
    for word in messages:
        work_queue.put(word)
    thread_locker.release()

    # 等待队列清空
    while not work_queue.empty():
        pass

    # 通知线程是时候退出
    exitFlag = 1

    # 等待全部线程完成
    for t in threads:
        t.join()
    print("exit main thread")

# output:
# Thread1 processing one
# Thread3 processing two
# Thread2 processing three
# Thread1 processing four
# Thread3 processing five
# exit main thread

六、线程死锁

死锁是指两个或两个以上的进程或线程在执行过程当中,因争夺资源而形成的一种互相等待的现象。在线程间共享多个资源的时候,若是分别占有一部分资源而且同时在等待对方的资源,就会形成死锁。例如数据库操做时A线程须要B线程的结果进行操做,B线程的须要A线程的结果进行操做,当A,B线程同时在进行操做尚未结果出来时,此时A,B线程将会一直处于等待对方结束的状态。

import time
import threading

class Account:
    def __init__(self, _id, balance, lock):
        self.id = _id
        self.balance = balance
        self.lock = lock

    def withdraw(self, amount):
        self.balance -= amount

    def deposit(self, amount):
        self.balance += amount

def transfer(_from, to, amount):
    if _from.lock.acquire():
        _from.withdraw(amount)
        time.sleep(1)
        print('wait for lock...')
        if to.lock.acquire():
            to.deposit(amount)
            to.lock.release()
        _from.lock.release()
    print('finish...')

if __name__ == "__main__":
    a = Account('a', 1000, threading.Lock())
    b = Account('b', 1000, threading.Lock())
    threading.Thread(target=transfer, args=(a, b, 100)).start()
    threading.Thread(target=transfer, args=(b, a, 200)).start()

解决死锁问题的一种方案是为程序中的每个锁分配一个惟一的id,而后只容许按照升序规则来使用多个锁。

4、线程池

一、线程池简介

线程池在系统启动时即建立大量空闲的线程,程序只要将一个任务函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当任务函数执行结束后,线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个任务函数。
使用线程池能够有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会致使系统性能急剧降低,甚至致使 Python 解释器崩溃,而线程池的最大线程数参数能够控制系统中并发线程的数量不超过此数。
concurrent.futures模块中的 Executor是线程池的抽象基类,Executor 提供了两个子类,即ThreadPoolExecutor 和ProcessPoolExecutor,其中 ThreadPoolExecutor 用于建立线程池,ProcessPoolExecutor 用于建立进程池。
Exectuor 提供了以下经常使用接口:
submit(fn, *args, **kwargs):将 fn 函数提交给线程池。args 表明传给 fn 函数的参数,是元组类型,kwargs 表明以关键字参数的形式为 fn 函数传入参数,是字典类型。submit 方法会返回一个 Future 对象
map(func, *iterables, timeout=None, chunksize=1):map函数将会启动多个线程,以异步方式当即对 iterables 执行 map 处理。
shutdown(wait=True):关闭线程池。
Future 提供了以下方法:
cancel():取消Future 表明的线程任务。若是任务正在执行,不可取消,则返回 False;不然,程序会取消任务,并返回 True。
cancelled():返回Future表明的线程任务是否被成功取消。
running():若是Future 表明的线程任务正在执行、不可被取消,则返回 True。
done():若是Funture 表明的线程任务被成功取消或执行完成,则返回 True。
result(timeout=None):获取Future 表明的线程任务最后的返回结果。若是 Future 表明的线程任务还未完成,result方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
exception(timeout=None):获取Future 表明的线程任务所引起的异常。若是任务成功完成,没有异常,则该方法返回 None。
add_done_callback(fn):为Future 表明的线程任务注册一个“回调函数”,当线程任务成功完成时,程序会自动触发fn 函数。
使用用完一个线程池后,应该调用线程池的 shutdown() 方法, shutdown方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池再也不接收新任务,但会将全部的已提交任务执行完成。当线程池中的全部任务都执行完成后,线程池中的全部线程都会死亡。

二、线程池

ThreadPoolExecutor(max_works),若是未显式指定max_works,默认线程池会建立CPU的数目*5数量的线程 。

# -*- coding:utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
import threading
import time
import os
import string

class WorkThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        print('Process[%s]:%s start and run task' % (os.getpid(), threading.currentThread().getName()))
        time.sleep(2)
        return "Process[{}]:{} end".format(os.getpid(), threading.currentThread().getName())

def work_task(thread_name):
    print('Process[%s]:%s start and run task' % (os.getpid(), threading.currentThread().getName()))
    time.sleep(5)
    return "Process[{}]:{} end".format(os.getpid(), threading.currentThread().getName())

def get_call_back(future):
    print(future.result())

if __name__ == '__main__':
    print('main thread start')

    # create thread pool
    thread_pool = ThreadPoolExecutor(5)
    futures = []
    for i in range(5):
        thread = WorkThread()
        future = thread_pool.submit(thread.run)
        futures.append(future)

    for i in range(5):
        future = thread_pool.submit(work_task, i)
        futures.append(future)

    for future in futures:
        future.add_done_callback(get_call_back)

    # thread_pool.map(work_task, (2, 3, 4))
    thread_pool.shutdown()

# output:
# main thread start
# Process[718]:ThreadPoolExecutor-0_0 start and run task
# Process[718]:ThreadPoolExecutor-0_1 start and run task
# Process[718]:ThreadPoolExecutor-0_2 start and run task
# Process[718]:ThreadPoolExecutor-0_3 start and run task
# Process[718]:ThreadPoolExecutor-0_4 start and run task
# Process[718]:ThreadPoolExecutor-0_3 end
# Process[718]:ThreadPoolExecutor-0_3 start and run task
# Process[718]:ThreadPoolExecutor-0_1 end
# Process[718]:ThreadPoolExecutor-0_1 start and run task
# Process[718]:ThreadPoolExecutor-0_2 end
# Process[718]:ThreadPoolExecutor-0_2 start and run task
# Process[718]:ThreadPoolExecutor-0_0 end
# Process[718]:ThreadPoolExecutor-0_0 start and run task
# Process[718]:ThreadPoolExecutor-0_4 end
# Process[718]:ThreadPoolExecutor-0_4 start and run task
# Process[718]:ThreadPoolExecutor-0_2 end
# Process[718]:ThreadPoolExecutor-0_3 end
# Process[718]:ThreadPoolExecutor-0_1 end
# Process[718]:ThreadPoolExecutor-0_4 end
# Process[718]:ThreadPoolExecutor-0_0 end

三、进程池

ProcessPoolExecutor(max_works),若是未显式指定max_works,默认进程池会建立CPU的数目*5数量的进程 。
进程池同步方案:

from concurrent.futures import ProcessPoolExecutor
import os
import time
import random

def work_task(n):
    print('Process[%s] is running' % os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':
    start = time.time()
    pool = ProcessPoolExecutor()
    for i in range(5):
        obj = pool.submit(work_task, i).result()
    pool.shutdown()
    print('='*30)
    print("time: ", time.time() - start)

# output;
# Process[7372] is running
# Process[7373] is running
# Process[7374] is running
# Process[7375] is running
# Process[7372] is running
# ==============================
# time:  10.023026466369629

进程池异步方案:

from concurrent.futures import ProcessPoolExecutor
import os
import time
import random

def work_task(n):
    print('Process[%s] is running' % os.getpid())
    time.sleep(random.randint(1, 3))
    return n**2

if __name__ == '__main__':
    start = time.time()
    pool = ProcessPoolExecutor()
    objs = []
    for i in range(5):
        obj = pool.submit(work_task, i)
        objs.append(obj)
    pool.shutdown()
    print('='*30)
    print([obj.result() for obj in objs])
    print("time: ", time.time() - start)

# output;
# Process[8268] is running
# Process[8269] is running
# Process[8270] is running
# Process[8271] is running
# Process[8270] is running
# ==============================
# [0, 1, 4, 9, 16]
# time:  2.0124566555023193

5、生产者消费者模型

import threading
from queue import Queue
from urllib.request import urlopen

ips = ["www.baidu.com",
       "www.taobao.com",
       "www.huawei.com",
       "www.alibaba.com",
       "www.meituan.com",
       "www.xiaomi.com"]
ports = [80, 443]

class Producer(threading.Thread):
    def __init__(self, _queue):
        super(Producer, self).__init__()
        self.queue = _queue

    def run(self):
        urls = ["http://%s:%s" % (ip, port) for ip in ips for port in ports]
        for url in urls:
            self.queue.put(url)

class Consumer(threading.Thread):
    def __init__(self, _queue):
        super(Consumer, self).__init__()
        self.queue = _queue

    def run(self):
        try:
            url = self.queue.get()
            urlopen(url)
        except Exception as e:
            print("%s is unknown url" % url)
        else:
            print("%s is ok" % url)

if __name__ == "__main__":
    # 实例化一个队列
    queue = Queue()

    for i in range(2):
        producer = Producer(queue)
        producer.start()

    for i in range(30):
        consumer = Consumer(queue)
        consumer.start()

# output:
# http://www.taobao.com:443 is unknown url
# http://www.huawei.com:443 is unknown url
# http://www.huawei.com:443 is unknown urlhttp://www.taobao.com:443 is unknown url
#
# http://www.baidu.com:80 is ok
# http://www.baidu.com:443 is unknown url
# http://www.xiaomi.com:443 is unknown url
# http://www.baidu.com:80 is ok
# http://www.baidu.com:443 is unknown url
# http://www.xiaomi.com:443 is unknown url
# http://www.alibaba.com:443 is unknown url
# http://www.alibaba.com:443 is unknown url
# http://www.meituan.com:443 is unknown urlhttp://www.meituan.com:443 is unknown url
#
# http://www.huawei.com:80 is ok
# http://www.huawei.com:80 is ok
# http://www.xiaomi.com:80 is ok
# http://www.xiaomi.com:80 is ok
# http://www.taobao.com:80 is ok
# http://www.meituan.com:80 is ok
# http://www.meituan.com:80 is ok
# http://www.taobao.com:80 is ok
# http://www.alibaba.com:80 is ok
# http://www.alibaba.com:80 is ok