使用线程可让程序在相同的进程空间中并发地运行多个操做。python
内容目录
- 线程对象 threading.Thread
- 肯定当前线程 getName()
- 守护线程与非守护线程 daemon
- 枚举全部线程 enumerate()
- 子类化线程
- 定时器线程 Timer()
- 线程之间的信号 Event
- 控制对资源的访问 Lock
- 重入锁 RLock
- 锁的上下文管理器with lock
- 同步线程 Condition Barrier
- 限制对资源的并发访问 Semaphore
- 线程特殊数据 local()
使用Thread
最简单的方法是用目标函数实例化它,并调用start()来让它开始工做。安全
import threading def worker(): """thread worker function""" print('Worker') threads = [] for i in range(5): t = threading.Thread(target=worker) threads.append(t) t.start()
结果:网络
Worker Worker Worker Worker Worker
生成一个线程并传递参数来告诉它该作什么工做。任何类型的对象均可以做为参数传递给线程。下面这个例子传递一个数字,而后线程就会打印出来。数据结构
import threading def worker(num): """thread worker function""" print(f'Worker: {num}') threads = [] for i in range(5): t = threading.Thread(target=worker,args=(i,)) threads.append(t) t.start()
结果:多线程
Worker: 0 Worker: 1 Worker: 2 Worker: 3 Worker: 4
使用参数来识别或命名线程是很麻烦且没有必要的。每个线程实例都有一个带有默认值的名称,能够随着线程的建立而改变。并发
import threading import time def worker(): print(threading.current_thread().getName(), 'Starting') time.sleep(0.2) print(threading.current_thread().getName(), 'Exiting') def my_service(): print(threading.current_thread().getName(), 'Starting') time.sleep(0.3) print(threading.current_thread().getName(), 'Exiting') t = threading.Thread(name='my_service', target=my_service) w = threading.Thread(name='worker', target=worker) w2 = threading.Thread(target=worker) # use default name w.start() w2.start() t.start()
结果:"Thread-1"对应没有命名的w2app
worker Starting Thread-1 Starting my_service Starting Thread-1 Exiting worker Exiting my_service Exiting
大多数程序不使用打印来调试。logging模块支持使用%(threadName)s
在每一个日志消息中嵌入线程名称,而且也是线程安全的。dom
import logging import threading import time def worker(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def my_service(): logging.debug('Starting') time.sleep(0.3) logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format='[%(levelname)s] (%(threadName)-10s) %(message)s', ) t = threading.Thread(name='my_service', target=my_service) w = threading.Thread(name='worker', target=worker) w2 = threading.Thread(target=worker) # use default name w.start() w2.start() t.start()
结果:函数
[DEBUG] (worker ) Starting [DEBUG] (Thread-1 ) Starting [DEBUG] (my_service) Starting [DEBUG] (worker ) Exiting [DEBUG] (Thread-1 ) Exiting [DEBUG] (my_service) Exiting
守护 ---- 不阻塞主程序
非守护 -- 阻塞主程序
到目前为止,示例程序都是等全部线程都完成了它们的工做才退出,线程默认是非守护的。ui
import threading import time import logging def daemon(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def non_daemon(): logging.debug('Starting') logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) d = threading.Thread(name='daemon', target=daemon, daemon=True) # 守护,不阻塞主程序 t = threading.Thread(name='non-daemon', target=non_daemon) # 默认非守护,阻塞主程序 d.start() t.start()
结果:因为d是守护线程,主程序没必要等待d完成就能够退出。
(daemon ) Starting (non-daemon) Starting (non-daemon) Exiting
要想使主程序等待守护线程完成任务后才能退出,可使用join():
import threading import time import logging def daemon(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def non_daemon(): logging.debug('Starting') logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) d = threading.Thread(name='daemon', target=daemon, daemon=True) t = threading.Thread(name='non-daemon', target=non_daemon) d.start() t.start() d.join() t.join()
结果:
(daemon ) Starting (non-daemon) Starting (non-daemon) Exiting (daemon ) Exiting
另外,d.join(0.1)表示等待d 0.1秒。
enumerate()返回一个活动线程实例列表
import random import threading import time import logging def worker(): """thread worker function""" pause = random.randint(1, 5) / 10 logging.debug('sleeping %0.2f', pause) time.sleep(pause) logging.debug('ending') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) for i in range(3): t = threading.Thread(target=worker, daemon=True) t.start() main_thread = threading.main_thread() for t in threading.enumerate(): if t is main_thread: continue logging.debug('joining %s', t.getName()) t.join()
结果:
(Thread-1 ) sleeping 0.40 (Thread-2 ) sleeping 0.50 (Thread-3 ) sleeping 0.10 (MainThread) joining Thread-1 (Thread-3 ) ending (Thread-1 ) ending (MainThread) joining Thread-2 (Thread-2 ) ending (MainThread) joining Thread-3
在启动时,一个线程执行一些基本的初始化,而后调用它的run()方法,该方法调用传递给构造函数的目标函数。如今要建立线程的子类,覆盖run()来作任何须要的事情。
import threading import logging class MyThread(threading.Thread): def run(self): logging.debug('running') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) for i in range(5): t = MyThread() t.start()
结果:
(Thread-1 ) running (Thread-2 ) running (Thread-3 ) running (Thread-4 ) running (Thread-5 ) running
实例化的时候带参数:
import threading import logging class MyThreadWithArgs(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): super().__init__(group=group, target=target, name=name, daemon=daemon) self.args = args self.kwargs = kwargs def run(self): logging.debug('running with %s and %s', self.args, self.kwargs) logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) for i in range(5): t = MyThreadWithArgs(args=(i,), kwargs={'a': 'A', 'b': 'B'}) t.start()
结果:
(Thread-1 ) running with (0,) and {'a': 'A', 'b': 'B'} (Thread-2 ) running with (1,) and {'a': 'A', 'b': 'B'} (Thread-3 ) running with (2,) and {'a': 'A', 'b': 'B'} (Thread-4 ) running with (3,) and {'a': 'A', 'b': 'B'} (Thread-5 ) running with (4,) and {'a': 'A', 'b': 'B'}
threading.Timer()建立延时线程,可取消,取消后不在执行。
import threading import time import logging def delayed(): logging.debug('worker running') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) t1 = threading.Timer(0.3, delayed) # 线程t1要在0.3s后执行 t1.setName('t1') t2 = threading.Timer(0.3, delayed) # 线程t2要在0.3s后执行 t2.setName('t2') logging.debug('starting timers') t1.start() t2.start() logging.debug('waiting before canceling %s', t2.getName()) time.sleep(0.2) logging.debug('canceling %s', t2.getName()) t2.cancel() # t2取消了,t2不会执行了 logging.debug('done')
结果:能够看到t2没有执行
(MainThread) starting timers (MainThread) waiting before canceling t2 (MainThread) canceling t2 (MainThread) done (t1 ) worker running
尽管使用多线程的目的是并发地运行单独的操做,可是有时候,可以在两个或多个线程中同步操做也很重要。事件对象是安全地在线程之间进行通讯的一种简单方法。
python线程的事件(Event)用于主线程控制其余线程的执行,事件主要提供了三个方法wait、clear、set,
一个事件管理一个内部标志flag,调用者能够用set()和clear()方法来控制它。其余线程可使用wait()暂停直到flag被设置,有效地阻止进度,直到容许继续为止。
import logging import threading import time def wait_for_event(e): """Wait for the event to be set before doing anything""" logging.debug('wait_for_event starting') event_is_set = e.wait() logging.debug('event set: %s', event_is_set) def wait_for_event_timeout(e, t): """Wait t seconds and then timeout""" while not e.is_set(): logging.debug('wait_for_event_timeout starting') event_is_set = e.wait(t) logging.debug('event set: %s', event_is_set) if event_is_set: logging.debug('processing event') else: logging.debug('doing other work') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) e = threading.Event() t1 = threading.Thread( name='block', target=wait_for_event, args=(e,), ) t1.start() t2 = threading.Thread( name='nonblock', target=wait_for_event_timeout, args=(e, 2), ) t2.start() logging.debug('Waiting before calling Event.set()') time.sleep(0.3) e.set() logging.debug('Event is set')
结果:
(block ) wait_for_event starting (nonblock ) wait_for_event_timeout starting (MainThread) Waiting before calling Event.set() (MainThread) Event is set (nonblock ) event set: True (block ) event set: True (nonblock ) processing event
除了同步线程的操做以外,还必须可以控制对共享资源的访问,以防止污染或遗漏数据。Python的内置数据结构(列表、字典等)是线程安全的,在Python中实现的其余数据结构,或者像整数和浮点数这样的简单类型,都没有这种保护。使用Lock
防止同时访问一个对象。
import logging import random import threading import time class Counter: def __init__(self, start=0): self.lock = threading.Lock() # 建立锁 self.value = start def increment(self): logging.debug('Waiting for lock') self.lock.acquire() # 上锁 try: logging.debug('Acquired lock') self.value = self.value + 1 finally: self.lock.release() # 释放锁 def worker(c): for i in range(2): pause = random.random() logging.debug('Sleeping %0.02f', pause) time.sleep(pause) c.increment() logging.debug('Done') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) counter = Counter() # 实例化 for i in range(2): t = threading.Thread(target=worker, args=(counter,)) t.start() logging.debug('Waiting for worker threads') main_thread = threading.main_thread() for t in threading.enumerate(): if t is not main_thread: t.join() logging.debug('Counter: %d', counter.value)
结果:在这个例子中,worker()函数增长一个Counter实例,它管理一个锁,以防止两个线程同时改变其内部状态。若是没有使用锁,那么就有可能丢失value属性的更改。
(Thread-1 ) Sleeping 0.80 (Thread-2 ) Sleeping 0.70 (MainThread) Waiting for worker threads (Thread-2 ) Waiting for lock (Thread-2 ) Acquired lock (Thread-2 ) Sleeping 0.01 (Thread-2 ) Waiting for lock (Thread-2 ) Acquired lock (Thread-2 ) Done (Thread-1 ) Waiting for lock (Thread-1 ) Acquired lock (Thread-1 ) Sleeping 0.63 (Thread-1 ) Waiting for lock (Thread-1 ) Acquired lock (Thread-1 ) Done (MainThread) Counter: 4
因为lock.acquire()
会阻塞其余进程,能够用have_it = lock.acquire(0)
的方式无阻塞的尝试获取锁, lock_holder()在保持和释放锁之间的循环,worker()无阻塞地进行尝试上锁。
import logging import threading import time def lock_holder(lock): logging.debug('Starting') while True: lock.acquire() try: logging.debug('Holding') time.sleep(0.5) finally: logging.debug('Not holding') lock.release() time.sleep(0.5) def worker(lock): logging.debug('Starting') num_tries = 0 num_acquires = 0 while num_acquires < 3: time.sleep(0.5) logging.debug('Trying to acquire') have_it = lock.acquire(0) try: num_tries += 1 if have_it: logging.debug('Iteration %d: Acquired', num_tries) num_acquires += 1 else: logging.debug('Iteration %d: Not acquired', num_tries) finally: if have_it: lock.release() logging.debug('Done after %d iterations', num_tries) logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) lock = threading.Lock() holder = threading.Thread( target=lock_holder, args=(lock,), name='LockHolder', daemon=True, ) holder.start() worker = threading.Thread( target=worker, args=(lock,), name='Worker', ) worker.start()
结果:
(LockHolder) Starting (LockHolder) Holding (Worker ) Starting (LockHolder) Not holding (Worker ) Trying to acquire (Worker ) Iteration 1: Acquired (LockHolder) Holding (Worker ) Trying to acquire (Worker ) Iteration 2: Not acquired (LockHolder) Not holding (Worker ) Trying to acquire (Worker ) Iteration 3: Acquired (LockHolder) Holding (Worker ) Trying to acquire (Worker ) Iteration 4: Not acquired (LockHolder) Not holding (Worker ) Trying to acquire (Worker ) Iteration 5: Acquired (Worker ) Done after 5 iterations
正常的Lock对象不能被屡次获取,即便是相同的线程。若是一个锁被同一个调用链中的多个函数访问,那么这会带来不但愿的反作用。
import threading lock = threading.Lock() print('First try :', lock.acquire()) print('Second try:', lock.acquire(0))
结果:lock.acquire(0),0超时防止阻塞
First try : True Second try: False
在这种状况下,来自同一线程的独立代码须要“从新得到”锁,而是使用RLock。
import threading lock = threading.RLock() print('First try :', lock.acquire()) print('Second try:', lock.acquire(0))
结果:
First try : True Second try: True
锁实现上下文管理器API,而且与with语句兼容。使用消除了显式获取和释放锁的须要
import threading import logging def worker_with(lock): with lock: logging.debug('Lock acquired via with') def worker_no_with(lock): lock.acquire() try: logging.debug('Lock acquired directly') finally: lock.release() logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) lock = threading.Lock() w = threading.Thread(target=worker_with, args=(lock,)) nw = threading.Thread(target=worker_no_with, args=(lock,)) w.start() nw.start()
结果:这两个函数workerwith()和workernowith()以等价的方式管理锁。
(Thread-1 ) Lock acquired via with (Thread-2 ) Lock acquired directly
除了使用 Event
以外,另外一种同步线程的方法是使用一个Condition
对象。由于Condition
使用Lock
,因此它能够绑定到共享资源,容许多个线程等待资源更新。在这个例子中, consumer()
线程在运行以前等待Condition
设置。producer()
线程负责设置Condition
,并通知其余线程能够继续执行了。
import logging import threading import time def consumer(cond): """wait for the condition and use the resource""" logging.debug('Starting consumer thread') with cond: cond.wait() logging.debug('Resource is available to consumer') def producer(cond): """set up the resource to be used by the consumer""" logging.debug('Starting producer thread') with cond: logging.debug('Making resource available') cond.notifyAll() logging.basicConfig( level=logging.DEBUG, format='%(asctime)s (%(threadName)-2s) %(message)s', ) condition = threading.Condition() c1 = threading.Thread(name='c1', target=consumer, args=(condition,)) c2 = threading.Thread(name='c2', target=consumer, args=(condition,)) p = threading.Thread(name='p', target=producer, args=(condition,)) c1.start() time.sleep(0.2) c2.start() time.sleep(0.2) p.start()
结果,这里用了with
,显式地用acquire() 和 release()也能够。
2018-05-01 15:39:49,561 (c1) Starting consumer thread 2018-05-01 15:39:49,761 (c2) Starting consumer thread 2018-05-01 15:39:49,962 (p ) Starting producer thread 2018-05-01 15:39:49,962 (p ) Making resource available 2018-05-01 15:39:49,963 (c1) Resource is available to consumer 2018-05-01 15:39:49,964 (c2) Resource is available to consumer
Barrier是另外一个线程同步机制
threading.Barrier(parties, action=None, timeout=None)
构建Barrier对象,parties 指定参与方数目,timeout是wait方法未指定时超时的默认值。
n_waiting 当前在栅栏中等待的线程数
parties 经过栅栏所需的线程数
wait(timeout=None) 等待经过栅栏,返回0到线程数-1的整数(barrier_id),每一个线程返回不一样。若是wait方法设置了超时,并超时发送,栅栏将处于broken状态。
import threading import time def worker(barrier): print(threading.current_thread().name, 'waiting for barrier with {} others'.format( barrier.n_waiting)) worker_id = barrier.wait() print(threading.current_thread().name, 'after barrier', worker_id) NUM_THREADS = 3 barrier = threading.Barrier(NUM_THREADS) threads = [ threading.Thread( name='worker-%s' % i, target=worker, args=(barrier,), ) for i in range(NUM_THREADS) ] for t in threads: print(t.name, 'starting') t.start() time.sleep(0.1) for t in threads: t.join()
结果:在这个例子中,Barrier被配置为阻塞,直到三个线程正在等待。当条件知足时,全部的线程都会在同一时间经过控制点。
worker-0 starting worker-0 waiting for barrier with 0 others worker-1 starting worker-1 waiting for barrier with 1 others worker-2 starting worker-2 waiting for barrier with 2 others worker-2 after barrier 2 worker-0 after barrier 0 worker-1 after barrier 1
abort() 将Barrie置于broken状态,等待中的线程或者调用等待方法的线程都会抛出threading.BrokenBarrieError异常
import threading import time def worker(barrier): print(threading.current_thread().name, 'waiting for barrier with {} others'.format( barrier.n_waiting)) try: worker_id = barrier.wait() except threading.BrokenBarrierError: print(threading.current_thread().name, 'aborting') else: print(threading.current_thread().name, 'after barrier', worker_id) NUM_THREADS = 3 barrier = threading.Barrier(NUM_THREADS + 1) threads = [ threading.Thread( name='worker-%s' % i, target=worker, args=(barrier,), ) for i in range(NUM_THREADS) ] for t in threads: print(t.name, 'starting') t.start() time.sleep(0.5) barrier.abort() for t in threads: t.join()
结果:指望阻塞的数量比实际线程数多一个,这样就都阻塞了,这时候用abort()就停止了因此线程
worker-0 starting worker-0 waiting for barrier with 0 others worker-1 starting worker-1 waiting for barrier with 1 others worker-2 starting worker-2 waiting for barrier with 2 others worker-0 aborting worker-1 aborting worker-2 aborting
有时可能须要容许多个工做线程同时访问一个资源,但要限制总数。例如,链接池支持同时链接,但数目多是固定的,或者一个网络应用可能支持固定数目的并发下载。这些链接就可使用Semaphore来管理。
import logging import random import threading import time class ActivePool: def __init__(self): super(ActivePool, self).__init__() self.active = [] self.lock = threading.Lock() def makeActive(self, name): with self.lock: self.active.append(name) logging.debug('Running: %s', self.active) def makeInactive(self, name): with self.lock: self.active.remove(name) logging.debug('Running: %s', self.active) def worker(s, pool): logging.debug('Waiting to join the pool') # with上下文 with s: name = threading.current_thread().getName() pool.makeActive(name) time.sleep(0.1) pool.makeInactive(name) logging.basicConfig( level=logging.DEBUG, format='%(asctime)s (%(threadName)-2s) %(message)s', ) pool = ActivePool() s = threading.Semaphore(2) for i in range(4): t = threading.Thread( target=worker, name=str(i), args=(s, pool), ) t.start()
结果:能够看到每次最多同时有2个线程运行,添加新线程要在其中有线程完成工做以后,不然会等待。
2018-05-01 18:18:46,359 (0 ) Waiting to join the pool 2018-05-01 18:18:46,360 (0 ) Running: ['0'] 2018-05-01 18:18:46,360 (1 ) Waiting to join the pool 2018-05-01 18:18:46,361 (1 ) Running: ['0', '1'] 2018-05-01 18:18:46,361 (2 ) Waiting to join the pool 2018-05-01 18:18:46,362 (3 ) Waiting to join the pool 2018-05-01 18:18:46,460 (0 ) Running: ['1'] 2018-05-01 18:18:46,460 (2 ) Running: ['1', '2'] 2018-05-01 18:18:46,461 (1 ) Running: ['2'] 2018-05-01 18:18:46,461 (3 ) Running: ['2', '3'] 2018-05-01 18:18:46,560 (2 ) Running: ['3'] 2018-05-01 18:18:46,561 (3 ) Running: []
虽然有些资源须要被锁定,以便多个线程可使用它们,但也须要保护其余资源,以便将它们隐藏在不拥有它们的线程中。local()
类建立了一个可以在单独的线程中隐藏值的对象,其余线程看不到,尽管名字同样。
import random import threading import logging def show_value(data): try: val = data.value except AttributeError: logging.debug('No value yet') else: logging.debug('value=%s', val) def worker(data): show_value(data) data.value = random.randint(1, 100) show_value(data) logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) local_data = threading.local() show_value(local_data) local_data.value = 1000 show_value(local_data) for i in range(2): t = threading.Thread(target=worker, args=(local_data,)) t.start()
结果:
(MainThread) No value yet (MainThread) value=1000 (Thread-1 ) No value yet (Thread-1 ) value=64 (Thread-2 ) No value yet (Thread-2 ) value=99
为了初始化设置,全部的线程都以相同的值开始,使用一个子类并在init()中设置属性。
import random import threading import logging def show_value(data): try: val = data.value except AttributeError: logging.debug('No value yet') else: logging.debug('value=%s', val) def worker(data): show_value(data) data.value = random.randint(1, 100) show_value(data) class MyLocal(threading.local): def __init__(self, value): super().__init__() logging.debug('Initializing %r', self) self.value = value logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) local_data = MyLocal(1000) show_value(local_data) for i in range(2): t = threading.Thread(target=worker, args=(local_data,)) t.start()
结果:利用 初始化 给每一个调用local_data实例的线程赋初值。
(MainThread) Initializing <__main__.MyLocal object at 0x00000000026014C8> (MainThread) value=1000 (Thread-1 ) Initializing <__main__.MyLocal object at 0x00000000026014C8> (Thread-1 ) value=1000 (Thread-1 ) value=52 (Thread-2 ) Initializing <__main__.MyLocal object at 0x00000000026014C8> (Thread-2 ) value=1000 (Thread-2 ) value=94