Python多线程与队列

Python多线程与Queue队列多线程在感官上相似于同时执行多个程序,虽然因为GIL的存在,在Python中没法实现线程的真正并行,可是对于某些场景,多线程仍不失为一个有效的处理方法:python

1,不紧急的,无需阻塞主线程的任务,此时能够利用多线程在后台慢慢处理;
2,IO密集型操做,好比文件读写、用户输入和网络请求等,此时多线程能够近似达到甚至优于多进程的表现;
网络

多线程的基本使用再也不赘述,如下语法即可轻松实现:多线程

1 def task(args1, args2):
2     pass
3 
4 Thread(
5     target=task,
6     args=(args1, args2)
7 ).start()


这里咱们重点关注线程通讯。ide

假设有这么一种场景:有一批源数据,指定一个操做系数N,须要分别对其进行与N的加减乘除操做,并将结果汇总。
固然这里的加减乘除只是一种简单处理,在实际的生产环境中,它其实表明了一步较为复杂的业务操做,并包含了较多的IO处理。测试

天然咱们想到能够开启多线程处理,那么紧接着的问题即是:如何划分线程,是根据处理步骤划分,仍是根据源数据划分?spa

对于前者,咱们把涉及的业务操做单独划分位一个线程,即有4个线程分别进行加减乘除的操做,显然上一个线程的结果是下一个线程的输入,这相似于流水线操做;线程

然后者则是把源数据分为若干份,每份启动一个线程进行处理,最终把结果汇总。通常来讲,咱们推荐第一种方式。由于在一个线程中完成全部的操做不如每步一个线程清晰明了,3d

尤为是在一些复杂的场景下,会加大单个线程的出错几率和测试难度。日志

那么咱们将开辟4个线程,分别执行加减乘除操做。最后一个除法线程结束则任务完成:code

 

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 from Queue import Queue
 5 from threading import Thread
 6 
 7 
 8 class NumberHandler(object):
 9     def __init__(self, n):
10         self.n = n
11 
12     def add(self, num):
13         return num + self.n
14 
15     def subtract(self, num):
16         return num - self.n
17 
18     def multiply(self, num):
19         return num * self.n * self.n
20 
21     def divide(self, num):
22         return num / self.n
23 
24 
25 class ClosableQueue(Queue):
26     SENTINEL = object()
27 
28     def close(self):
29         self.put(self.SENTINEL)
30 
31     def __iter__(self):
32         while True:
33             item = self.get()
34             try:
35                 if item is self.SENTINEL:
36                     return
37                 yield item
38             finally:
39                 self.task_done()
40 
41 
42 class StoppableWorker(Thread):
43     def __init__(self, func, in_queue, out_queue):
44         super(StoppableWorker, self).__init__()
45         self.in_queue = in_queue
46         self.out_queue = out_queue
47         self.func = func
48 
49     def run(self):
50         for item in self.in_queue:
51             result = self.func(item)
52             self.out_queue.put(result)
53             print self.func
54 
55 
56 if __name__ == '__main__':
57     source_queue = ClosableQueue()
58     add_queue = ClosableQueue()
59     subtract_queue = ClosableQueue()
60     multiply_queue = ClosableQueue()
61     divide_queue = ClosableQueue()
62     result_queue = ClosableQueue()
63 
64     number_handler = NumberHandler(5)
65 
66     threads = [
67         StoppableWorker(number_handler.add, add_queue, subtract_queue),
68         StoppableWorker(number_handler.subtract, subtract_queue, multiply_queue),
69         StoppableWorker(number_handler.multiply, multiply_queue, divide_queue),
70         StoppableWorker(number_handler.divide, divide_queue, result_queue),
71     ]
72 
73     for _thread in threads:
74         _thread.start()
75 
76     for i in range(10):
77         add_queue.put(i)
78 
79     add_queue.close()
80     add_queue.join()
81     print 'add job done...'
82     subtract_queue.close()
83     subtract_queue.join()
84     print 'subtract job done...'
85     multiply_queue.close()
86     multiply_queue.join()
87     print 'multiply job done...'
88     divide_queue.close()
89     divide_queue.join()
90     print 'divide job done...'
91     result_queue.close()
92 
93     print "%s items finished, result: %s" % (result_queue.qsize(), result_queue)
94 
95     for i in result_queue:
96         print i

运行结果:

线程执行日志:

 

 

 总的结果:

 

 可见线程交叉运行,可是任务倒是顺序结束,这符合咱们的预期。

值得注意的是,咱们在ClosableQueue定义了一个close()方法,经过放入一个特殊的类变量SENTINEL告诉队列应该关闭。此外,因为直接加减乘除结果不变,所以我特地乘了两次来便于咱们判断结果。

总结:

1. Queue是一种高效的任务处理方式,它能够把任务处理流程划分为若干阶段,并使用多条python线程来同时执行这些子任务;

2. Queue类具有阻塞式的队列操做、可以指定缓冲区尺寸,并且还支 持join方法,这使得开发者能够构建出健壮的流水线。

相关文章
相关标签/搜索