学习PYTHON之路, DAY 10 进程、线程、协程篇

线程

线程是应用程序中工做的最小单元。它被包含在进程之中,是进程中的实际运做单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中能够并发多个线程,每条线程并行执行不一样的任务。python

直接调用多线程

import threading import time def show(arg): time.sleep(1) print 'thread'+str(arg) for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print 'main thread stop'

继承式调用并发

import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self):#定义每一个线程要运行的函数
 
        print("running on number:%s" %self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()

更多方法:app

    • start            线程准备就绪,等待CPU调度
    • setName      为线程设置名称
    • getName      获取线程名称
    • setDaemon   设置为后台线程或前台线程(默认)
                         若是是后台线程,主线程执行过程当中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均中止
                          若是是前台线程,主线程执行过程当中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序中止
    • join              逐个执行每一个线程,执行完毕后继续往下执行,该方法使得多线程变得无心义
    • run              线程被cpu调度后自动执行线程对象的run方法
 1 import time  2 import threading  3  
 4  
 5 def run(n):  6  
 7     print('[%s]------running----\n' % n)  8     time.sleep(2)  9     print('--done--') 10  
11 def main(): 12     for i in range(5): 13         t = threading.Thread(target=run,args=[i,]) 14  t.start() 15         t.join(1) 16         print('starting thread', t.getName()) 17  
18  
19 m = threading.Thread(target=main,args=[]) 20 m.setDaemon(True) #将main线程设置为Daemon线程,它作为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,无论是否执行完任务
21 m.start() 22 m.join(timeout=2) 23 print("---main thread done----")
守护线程

线程锁(Lock、RLock)

因为线程之间是进行随机调度,而且每一个线程可能只执行n条执行以后,当多个线程同时修改同一条数据时可能会出现脏数据,因此,出现了线程锁 - 同一时刻容许一个线程执行操做。dom

import threading import time gl_num = 0 lock = threading.RLock() def Func(): lock.acquire() global gl_num gl_num +=1 time.sleep(1) print gl_num lock.release() for i in range(10): t = threading.Thread(target=Func) t.start()

信号量(Semaphore)

互斥锁 同时只容许一个线程更改数据,而Semaphore是同时容许必定数量的线程更改数据 ,好比厕全部3个坑,那最多只容许3我的上厕所,后面的人只能等里面有人出来了才能再进去。ide

 1 import threading,time  2  
 3 def run(n):  4  semaphore.acquire()  5     time.sleep(1)  6     print("run the thread: %s" %n)  7  semaphore.release()  8  
 9 if __name__ == '__main__': 10  
11     num= 0 12     semaphore  = threading.BoundedSemaphore(5) #最多容许5个线程同时运行
13     for i in range(20): 14         t = threading.Thread(target=run,args=(i,)) 15         t.start()

事件(event)

python线程的事件用于主线程控制其余线程的执行,事件主要提供了三个方法 set、wait、clear。函数

事件处理的机制:全局定义了一个“Flag”,若是“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,若是“Flag”值为True,那么event.wait 方法时便再也不阻塞。测试

  • clear:将“Flag”设置为False
  • set:将“Flag”设置为True
import threading def do(event): print 'start' event.wait() print 'execute' event_obj = threading.Event() for i in range(10): t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear() inp = raw_input('input:') if inp == 'true': event_obj.set()

条件(Condition)

使得线程等待,只有知足某条件时,才释放n个线程ui

def condition_func(): ret = False inp = input('>>>') if inp == '1': ret = True return ret def run(n): con.acquire() con.wait_for(condition_func) print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start()

队列

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out 
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

Queue.qsize()spa

 

Queue.empty() #return True if empty  

 

 

Queue.full() # return True if full 

 

 

Queue.put(itemblock=Truetimeout=None)Queue.put_nowait(item)Equivalent to put(item, False).

 

 

Queue.get(block=Truetimeout=None)Queue.get_nowait()

 

Equivalent to get(False)

Queue.task_done()

 1 import time,random
 2 import queue,threading
 3 q = queue.Queue()
 4 def Producer(name):
 5   count = 0
 6   while count <20:
 7     time.sleep(random.randrange(3))
 8     q.put(count)
 9     print('Producer %s has produced %s baozi..' %(name, count))
10     count +=1
11 def Consumer(name):
12   count = 0
13   while count <20:
14     time.sleep(random.randrange(4))
15     if not q.empty():
16         data = q.get()
17         print(data)
18         print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
19     else:
20         print("-----no baozi anymore----")
21     count +=1
22 p1 = threading.Thread(target=Producer, args=('A',))
23 c1 = threading.Thread(target=Consumer, args=('B',))
24 p1.start()
25 c1.start()

线程池

 1 import queue
 2 import threading
 3 import time
 4 
 5 
 6 class ThreadPool:
 7     def __init__(self, maxsize=5):
 8         self.maxsize = maxsize
 9         self._q = queue.Queue(maxsize)
10         for i in range(maxsize):
11             self._q.put(threading.Thread)
12         # 【threading.Thread,threading.Thread,threading.Thread,threading.Thread,threading.Thread】
13     def get_thread(self):
14         return self._q.get()
15 
16     def add_thread(self):
17         self._q.put(threading.Thread)
18 
19 pool = ThreadPool(5)
20 
21 def task(arg,p):
22     print(arg)
23     time.sleep(1)
24     p.add_thread()
25 
26 for i in range(100):
27     # threading.Thread类
28     t = pool.get_thread()
29     obj = t(target=task,args=(i,pool,))
30     obj.start()
简单版
  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 # Author:Alex Li
  4 import queue
  5 import threading
  6 import contextlib
  7 import time
  8 
  9 StopEvent = object()
 10 
 11 class ThreadPool(object):
 12 
 13     def __init__(self, max_num, max_task_num = None):
 14         if max_task_num:
 15             self.q = queue.Queue(max_task_num)
 16         else:
 17             self.q = queue.Queue()
 18         self.max_num = max_num
 19         self.cancel = False
 20         self.terminal = False
 21         self.generate_list = []
 22         self.free_list = []
 23 
 24     def run(self, func, args, callback=None):
 25         """
 26         线程池执行一个任务
 27         :param func: 任务函数
 28         :param args: 任务函数所需参数
 29         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数一、任务函数执行状态;二、任务函数返回值(默认为None,即:不执行回调函数)
 30         :return: 若是线程池已经终止,则返回True不然None
 31         """
 32         if self.cancel:
 33             return
 34         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
 35             self.generate_thread()
 36         w = (func, args, callback,)
 37         self.q.put(w)
 38 
 39     def generate_thread(self):
 40         """
 41         建立一个线程
 42         """
 43         t = threading.Thread(target=self.call)
 44         t.start()
 45 
 46     def call(self):
 47         """
 48         循环去获取任务函数并执行任务函数
 49         """
 50         current_thread = threading.currentThread
 51         self.generate_list.append(current_thread)
 52 
 53         event = self.q.get()
 54         while event != StopEvent:
 55 
 56             func, arguments, callback = event
 57             try:
 58                 result = func(*arguments)
 59                 success = True
 60             except Exception as e:
 61                 success = False
 62                 result = None
 63 
 64             if callback is not None:
 65                 try:
 66                     callback(success, result)
 67                 except Exception as e:
 68                     pass
 69 
 70             with self.worker_state(self.free_list, current_thread):
 71                 if self.terminal:
 72                     event = StopEvent
 73                 else:
 74                     event = self.q.get()
 75         else:
 76 
 77             self.generate_list.remove(current_thread)
 78 
 79     def close(self):
 80         """
 81         执行完全部的任务后,全部线程中止
 82         """
 83         self.cancel = True
 84         full_size = len(self.generate_list)
 85         while full_size:
 86             self.q.put(StopEvent)
 87             full_size -= 1
 88 
 89     def terminate(self):
 90         """
 91         不管是否还有任务,终止线程
 92         """
 93         self.terminal = True
 94 
 95         while self.generate_list:
 96             self.q.put(StopEvent)
 97 
 98         self.q.empty()
 99 
100     @contextlib.contextmanager
101     def worker_state(self, state_list, worker_thread):
102         """
103         用于记录线程中正在等待的线程数
104         """
105         state_list.append(worker_thread)
106         try:
107             yield
108         finally:
109             state_list.remove(worker_thread)
110 
111 
112 pool = ThreadPool(5)
113 
114 def callback(status, result):
115     # status, execute action status
116     # result, execute action return value
117     pass
118 
119 def action(i):
120     print(i)
121 
122 for i in range(300):
123     ret = pool.run(action, (i,), callback)
124 
125 # time.sleep(5)
126 # print(len(pool.generate_list), len(pool.free_list))
127 # print(len(pool.generate_list), len(pool.free_list))
复杂版

 上下文管理

 1 import contextlib
 2 
 3 @contextlib.contextmanager #加了这个装饰器,能够用with
 4 def work(free_list, worker_thread):
 5     free_list.append(worker_thread)
 6     try:
 7         yield
 8     finally:
 9         free_list.remove(worker_thread)
10 
11 free_list = []
12 worker_thread = '1'
13 with work(free_list, worker_thread):
14     print(123)

运行顺序

多进程

from multiprocessing import Process
import time
def f(name):
    time.sleep(2)
    print('hello', name)
 
if __name__ == '__main__': #在windos进程只能作测试,必定要写这句
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

进程间通信

Queues

使用方法跟threading里的queue差很少

from multiprocessing import Process, Queue

def f(i,q):
    print(i,q.get())

if __name__ == '__main__':
    q = Queue()

    q.put("h1")
    q.put("h2")
    q.put("h3")

    for i in range(10):
        p = Process(target=f, args=(i,q,))
        p.start()

 Managers

 1 from multiprocessing import Process, Manager
 2  
 3 def f(d, l):
 4     d[1] = '1'
 5     d['2'] = 2
 6     d[0.25] = None
 7     l.append(1)
 8     print(l)
 9  
10 if __name__ == '__main__':
11     with Manager() as manager:
12         d = manager.dict()
13  
14         l = manager.list(range(5))
15         p_list = []
16         for i in range(10):
17             p = Process(target=f, args=(d, l))
18             p.start()
19             p_list.append(p)
20         for res in p_list:
21             res.join()
22  
23         print(d)
24         print(l)

 

协程

协程一个标准定义:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里本身保存多个控制流的上下文栈
  4. 一个协程遇到IO操做自动切换到其它协程

greenlet

 

 1 from greenlet import greenlet
 2  
 3  
 4 def test1():
 5     print 12
 6     gr2.switch()
 7     print 34
 8     gr2.switch()
 9  
10  
11 def test2():
12     print 56
13     gr1.switch()
14     print 78
15  
16 gr1 = greenlet(test1)
17 gr2 = greenlet(test2)
18 gr1.switch()

 

gevent

 1 import gevent
 2  
 3 def foo():
 4     print('Running in foo')
 5     gevent.sleep(0)
 6     print('Explicit context switch to foo again')
 7  
 8 def bar():
 9     print('Explicit context to bar')
10     gevent.sleep(0)
11     print('Implicit context switch back to bar')
12  
13 gevent.joinall([
14     gevent.spawn(foo),
15     gevent.spawn(bar),
16 ])
相关文章
相关标签/搜索