Thread实例对象的方法 # isAlive(): 返回线程是不是活动的。 # getName(): 返回线程名。 # setName(): 设置线程名。 threading模块提供的一些方法: # threading.currentThread(): 返回当前的线程变量对象。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
import threading import time from threading import Thread,current_thread def f1(n): time.sleep(1) print('子线程对象', current_thread()) # <Thread(Thread-1, started 123145336967168)> print('子线程名称', current_thread().getName()) # 当前线程对象 Thread-1 print('子线程ID', current_thread().ident) # 123145336967168 print('%s号线程任务'%n) if __name__ == '__main__': t1 = Thread(target=f1,args=(1,)) t1.start() print('主线程对象',current_thread()) # <_MainThread(MainThread, started 140734833878464)> print('主线程名称',current_thread().getName()) # 当前线程对象(是主线程对象) MainThread print('主线程ID',current_thread().ident) # 当前线程ID 140734833878464 print(threading.enumerate()) # [<_MainThread(MainThread, started 140734833878464)>, <Thread(Thread-1, started 123145336967168)>] print(threading.active_count()) # 2 """ 结果: 主线程对象 <_MainThread(MainThread, started 140734833878464)> 主线程名称 MainThread 主线程ID 140734833878464 [<_MainThread(MainThread, started 140734833878464)>, <Thread(Thread-1, started 123145336967168)>] 2 子线程对象 <Thread(Thread-1, started 123145336967168)> 子线程名称 Thread-1 子线程ID 123145336967168 1号线程任务 # 小结: threading.current_thread() <==等效于==> Thread(target=f1) #这两个等效的前提是: 左边 的位置要跟 右边target(目标函数)所在位置 同样,即左边的是获取当前位置的线程变量对象,右边的是在target(目标函数)所在位置建立线程对象. """
线程队列,不须要从threading模块里面导入,直接import queue就能够了,这是python自带的python
queue队列 :使用import queue,用法与进程队列 multiprocessing.Queue 同样,也有如下方法:nginx
# put,put_nowait,get,get_nowait,full,empty,qsize q = Queue(5) # 5是size q.put() #放入数据,满了会等待(阻塞) q.get() #获取数据,没有数据了会等待(阻塞) q.qsize() # 当前放进去的元素的个数 q.empty() # 是否为空,不可靠(由于多线程) q.full() # 是否满了,不可靠(由于多线程) q.put_nowait() #添加数据,不等待,可是队列满了报错 q.get_nowait() #获取数据,不等待,可是队列空了报错
class queue.
Queue
(maxsize=0) #先进先出(FIFO: fisrt in fisrt out)数组
import queue # 线程中的队列使用的是这个,等效于进程中的队列 put,put_nowait,get,get_nowait,full,empty q = queue.Queue(4) # FIFO先进先出 first in first out q.put(1) q.put(2) print(q.full()) # 不满 # print('当前队列内容的长度',q.qsize()) q.put(3) print(q.full()) # 满 # q.put(4) # 不报错,会阻塞 print(q.qsize()) try: q.put_nowait(4) # 报错queue.Full except Exception: print('queue full') print(q.get()) print(q.get()) print(q.empty()) # 不空 print(q.get()) print(q.empty()) # 空 print(q.get()) # 不报错,会阻塞 try: print(q.get_nowait()) # 报错queue.Empty except Exception: print('queue empty')
class queue.
LifoQueue
(maxsize=0) #先进后出队列(或者后进先出(LIFO: last in fisrt out)),相似于栈网络
q = queue.LifoQueue(3) # Lifo q.put(1) q.put(2) print(q.full()) # 不满 # print('当前队列内容的长度',q.qsize()) q.put(3) print(q.full()) # 满 # q.put(4) # 不报错,会阻塞 print(q.qsize()) try: q.put_nowait(4) # 报错queue.Full except Exception: print('queue full') print(q.get()) print(q.get()) print(q.empty()) # 不空 print(q.get()) print(q.empty()) # 空 print(q.get()) # 不报错,会阻塞 try: print(q.get_nowait()) # 报错queue.Empty except Exception: print('queue empty')
class queue.
PriorityQueue
(maxsize=0) #优先级的队列(存储数据时可设置优先级)多线程
# 优先级队列 PriorityQueue # put的数据是一个元组,元组的第一个参数是优先级数字(一般是数字,也能够是非数字之间的比较),数字越小优先级越高,越先被get拿到被取出来,第二个参数是put进去的值(能够是任意的数据类型) # 若是说优先级(第一个参数)相同,那么比较值(第二个参数),值必须是相同的数据类型(不包括字典),不然报错 # 比较第二个参数: # 若是第二个参数(或者其参数的元素)是数字: 数字==直接拿总体的数字==>比较大小, # 若是第二个参数(或者其参数的元素)是字符串:字符串=依次取到每一个字符=>比较每一个字符的ASCII码. q = queue.PriorityQueue(10) q.put((-5, 'alex')) # 放入元组,第一个元素是优先级(能够为负数,越小,优先级越高),第二个是真正的数据(数据类型随意) q.put((2, 'blex')) q.put((3, 'clex')) q.put((3, '111')) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print('=======================') q.put(('x', 123)) q.put(('y', 345)) print(q.get()) print(q.get()) print('=======================') """ ('x', 123) ('y', 345) """ q.put((5, 'alex')) # 放入元组,第一个元素是优先级(能够为负数,越小,优先级越高),第二个是真正的数据(数据类型随意) q.put((2, 1)) q.put((3, (1,))) # q.put((7, {1,2})) # 优先级相同数据类型不一样,报错TypeError: '<' not supported between instances of 'dict' and 'set' q.put((7, {1:2})) q.put((7, {1:'a'})) # 优先级相同数据类型都是字典,报错TypeError: '<' not supported between instances of 'dict' and 'dict' print(q.get()) print(q.get()) print(q.get()) print(q.get()) print('=======================')
统一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式同样,并且只要经过这个concurrent.futures导入就能够直接用他们两个了并发
concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor: 进程池,提供异步调用 Both implement the same interface, which is defined by the abstract Executor class. 二者实现相同的接口,该接口由抽象Executor类定义。 #2 基本方法 #submit(fn, *args, **kwargs) 异步提交任务(万能传参,传入的实参能够是任意数据类型,注意fn的形参数量要和这里的实参数量对应) #map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操做(参数1是函数,参数2是可迭代对象) #shutdown(wait=True) ==>close()+join() 至关于进程池的multiprocessing.Pool().close()+multiprocessing.Pool().join()操做 wait=True,等待池内全部任务执行完毕回收完资源后才继续 wait=False,当即返回,并不会等待池内的任务执行完毕 但无论wait参数为什么值,整个程序都会等到全部任务执行完毕 submit和map必须在shutdown以前 #result(timeout=None) 取得结果(至关于pool.get()) #add_done_callback(fn) 回调函数(功能相似于pool的callback,可是显然用法不一样) """ multiprocessing.Pool和concurrent.futures.ThreadPoolExecutor,ProcessPoolExecutor中回调函数的区别: 进程的回调函数res = pool.apply_async(f1,args=(5,),callback=call_back_func) (这里的callback是默认的关键字,call_back_func是自定义的回调函数名)==>做为异步对象的参数调用 线程的回调函数res = tp.submit(f1,11,12).add_done_callback(f2) (这里的add_done_callback是默认的回调函数名,f2是自定义的回调函数)==>做为异步对象的方法调用) """
上栗子:app
import time import os import threading from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) print('%s打印的:'%(threading.get_ident()),n) return n*n tpool = ThreadPoolExecutor(max_workers=5) #默认通常起线程的数据不超过CPU个数*5 # tpool = ProcessPoolExecutor(max_workers=5) #进程池的使用只须要将上面的ThreadPoolExecutor改成ProcessPoolExecutor就好了,其余都不用改 #异步执行 t_lst = [] for i in range(5): t = tpool.submit(func,i) #提交执行函数,返回一个结果对象,i做为任务函数的参数 def submit(self, fn, *args, **kwargs): 能够传任意形式的参数 t_lst.append(t) # # print(t.result()) #这个返回的结果对象t,不能直接去拿结果,否则又变成串行了,能够理解为拿到一个号码,等全部线程的结果都出来以后,咱们再去经过结果对象t获取结果 tpool.shutdown() #起到原来的close阻止新任务进来 + join的做用,等待全部的线程执行完毕 print('主线程') for ti in t_lst: print('>>>>',ti.result()) # 咱们还能够不用shutdown(),用下面这种方式 # while 1: # for n,ti in enumerate(t_lst): # print('>>>>', ti.result(),n) # time.sleep(2) #每一个两秒去去一次结果,哪一个有结果了,就能够取出哪个,想表达的意思就是说不用等到全部的结果都出来再去取,能够轮询着去取结果,由于你的任务须要执行的时间很长,那么你须要等好久才能拿到结果,经过这样的方式能够将快速出来的结果先拿出来。若是有的结果对象里面尚未执行结果,那么你什么也取不到,这一点要注意,不是空的,是什么也取不到,那怎么判断我已经取出了哪个的结果,能够经过枚举enumerate来搞,记录你是哪个位置的结果对象的结果已经被取过了,取过的就再也不取了 #结果分析: 打印的结果是没有顺序的,由于到了func函数中的sleep的时候线程会切换,谁先打印就没准儿了,可是最后的咱们经过结果对象取结果的时候拿到的是有序的,由于咱们主线程进行for循环的时候,咱们是按顺序将结果对象添加到列表中的。 # 37220打印的: 0 # 32292打印的: 4 # 33444打印的: 1 # 30068打印的: 2 # 29884打印的: 3 # 主线程 # >>>> 0 # >>>> 1 # >>>> 4 # >>>> 9 # >>>> 16
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time def f1(n,s): # 要与 万能传参 的参数数量一致 time.sleep(1) # print(n,s) return n * n if __name__ == '__main__': tp = ThreadPoolExecutor(4) # 线程 默认的线程个数是cpu个数 * 5 # tp = ProcessPoolExecutor(4) # 进程 默认的进程个数是cpu个数 这两个的方法一致 # tp.map(f1, range(10)) # 异步提交任务,参数是(任务名,可迭代对象) res_lis = [] for i in range(10): res = tp.submit(f1,i,'baobao') # submit是给线程池异步提交任务,万能传参 # print(res) # <Future at 0x10617a208 state=running> res_lis.append(res) for t in res_lis: # 4个4个的打印 print(t.result()) tp.shutdown() # ==等效于==> close + join 主线程等待全部提交给线程池的任务所有执行完毕 for t in res_lis: # 所有一块儿打印 print(t.result()) # 结果对象.result,#和get方法同样,若是没有结果,会等待,阻塞程序 print('主线程') """ 只须要将这一行代码改成下面这一行就能够了,其余的代码都不用变 tpool = ThreadPoolExecutor(max_workers=5) #默认通常起线程的数据不超过CPU个数*5 # tpool = ProcessPoolExecutor(max_workers=5)#默认通常起线程的数据不超过CPU个数 你就会发现为何将线程池和进程池都放到这一个模块里面了,由于用法同样,因此不论是线程池仍是进程池,更推荐使用这个from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor """
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) return n*n def call_back(m): print('结果为:%s'%(m.result())) # 注意回调函数拿到的是线程(进程)对象,想要拿到值须要调用result方法 tpool = ThreadPoolExecutor(max_workers=5) t_lst = [] for i in range(5): t = tpool.submit(func,i).add_done_callback(call_back) """ 结果为:0 结果为:1 结果为:4 结果为:9 结果为:16 """
协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序本身控制调度的。负载均衡
须要强调的是:异步
#1. python的线程属于内核级别的,即由操做系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其余线程运行) #2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操做系统)控制切换,以此来提高效率(!!!非io操做的切换与效率无关)
操做系统控制线程的切换 <==对比==> 用户在单线程内控制协程的切换socket
#1. 协程的切换开销更小,属于程序级别的切换,操做系统彻底感知不到,于是更加轻量级 #2. 单线程内就能够实现并发的效果,最大限度地利用cpu
#1. 协程的本质是单线程下,没法利用多核,能够是一个程序开启多个进程,每一个进程内开启多个线程,每一个线程内开启协程 #2. 协程指的是单个线程,于是一旦协程出现阻塞,将会阻塞整个线程
# 1.必须在只有一个单线程里实现并发 # 2.修改共享数据不需加锁 # 3.用户程序里本身保存多个控制流的上下文栈 # 4.附加:一个协程遇到IO操做自动切换到其它协程(如何实现检测IO,yield、greenlet都没法实现,就用到了gevent模块(select机制))
协程就是告诉Cpython解释器,你不是nb吗,不是搞了个GIL锁吗,那好,我就本身搞成一个线程让你去执行,省去你切换线程的时间,我本身切换比你切换要快不少,避免了不少的开销,对于单线程下,咱们不可避免程序中出现io操做,但若是咱们能在本身的程序中(即用户程序级别,而非操做系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另一个任务去计算,这样就保证了该线程可以最大限度地处于就绪态,即随时均可以被cpu执行的状态,至关于咱们在用户程序级别将本身的io操做最大限度地隐藏起来,从而能够迷惑操做系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给咱们的线程。
协程的本质就是在单线程下,由用户本身控制一个任务遇到io阻塞了就切换另一个任务去执行,以此来提高效率。为了实现它,咱们须要找寻一种能够同时知足如下条件的解决方案:
#1. 能够控制多个任务之间的切换,切换以前将任务的状态保存下来,以便从新运行时,能够基于暂停的位置继续执行。 #2. 做为1的补充:能够检测io操做,在遇到io操做的状况下才发生切换
并发的本质:任务切换+保存状态,yield自己就是一种在单线程下能够保存任务运行状态的方法,
#1 yield能够保存状态,yield的状态保存与操做系统的保存线程状态很像,可是yield是代码级别控制的,更轻量级 #2 send能够把一个函数的结果传给另一个函数,以此实现单线程内程序之间的切换
import time #基于yield并发执行,多任务之间来回切换,这就是个简单的协程的体现,可是他可以节省I/O时间吗?不能,yield不能检测IO,不能实现遇到IO自动切换 def f1(): for i in range(3): time.sleep(0.5) # 发现什么?只是进行了切换,可是并无节省I/O时间 print('f1>>',i) # yield def f2(): # g = f1() for i in range(3): time.sleep(0.5) print('f2>>', i) # next(g) #不写yield,下面两个任务是执行完func1里面全部的程序才会执行func2里面的程序,有了yield,咱们实现了两个任务的切换+保存状态 #基于yield保存状态,实现两个任务直接来回切换,即并发的效果 #PS:若是每一个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的. f1() f2() """ f1>> 0 f1>> 1 f1>> 2 f2>> 0 f2>> 1 f2>> 2 有了yield: f2>> 0 f1>> 0 f2>> 1 f1>> 1 f2>> 2 f1>> 2
#安装==>在终端输入如下代码 pip3 install greenlet
import time from greenlet import greenlet # 真正的协程模块就是使用greenlet完成的切换 def f1(s): print('第一次f1==>'+s) g2.switch('taibai') #切换到g2这个对象的任务去执行 time.sleep(1) print('第一次f1==>'+s) g2.switch() def f2(s): print('第一次f2==>'+s) g1.switch() time.sleep(1) print('第二次f2==>'+s) g1 = greenlet(f1) #实例化一个greenlet对象,并将任务名称做为参数传进去 g2 = greenlet(f2) g1.switch('alex') #执行g1对象里面的任务,能够在第一次switch时传入参数,之后都不须要 """ greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时若是遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提高效率的问题。 """
通常在工做中咱们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果,若是是4核的cpu,通常起5个进程,每一个进程中20个线程(5倍cpu数量),每一个线程能够起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,咱们就能够用协程去实现并发。 并发数量 = 5 * 20 * 500 = 50000个并发,这是通常一个4cpu的机器最大的并发数。nginx在负载均衡的时候最大承载量就是5w个。
#安装==>在终端输入如下代码 pip3 install gevent
from gevent import monkey;monkey.patch_all() # 必须写在最上面,这句话后面的全部阻塞所有可以识别了 import gevent import time import threading # 遇到IO阻塞时会自动切换任务 def f1(name): print(f'{name}==第一次f1') print(threading.current_thread().getName()) # DummyThread-1 假线程,虚拟线程 # gevent.sleep(1) # gevent默承认以识别的io阻塞 time.sleep(2) # 加上mokey就可以识别到time模块的sleep了 print(f'{name}==第二次f1') return name def f2(name): print(threading.current_thread().getName()) # DummyThread-2 print(f'{name}==第一次f2') # gevent.sleep(2) time.sleep(2) # 来回切换,直到一个I/O的时间结束,这里都是咱们的gevent作得,再也不是控制不了的操做系统了。 print(f'{name}==第二次f2') s = time.time() g1 = gevent.spawn(f1,'alex') #异步提交了f1任务 g2 = gevent.spawn(f2,name='egon') #建立一个协程对象g2,spawn括号内第一个参数是函数名,如f2,后面能够有多个参数,能够是位置实参或关键字实参,都是传给函数f2的,spawn是异步提交任务 # g1.join() # 等待g1结束,上面只是建立协程对象,这个join才是去执行 # g2.join() # 等待g2结束 有人测试的时候会发现,不写第二个join也能执行g2,是的,协程帮你切换执行了,可是你会发现,若是g2里面的任务执行的时间长,可是不写join的话,就不会执行完等到g2剩下的任务了 gevent.joinall([g1,g2]) # 这里等价于上述join两步合做一步 print(g1.value)#拿到func1的返回值 e = time.time() print('执行时间:',e-s) # 测试执行时间 print('主程序任务') """ 结果: alex==第一次f1 DummyThread-1 DummyThread-2 egon==第一次f2 alex==第二次f1 egon==第二次f2 alex 执行时间: 2.004991054534912 主程序任务 """ ''' # spawn是类方法,参数是万能的 @classmethod def spawn(cls, *args, **kwargs): # 万能形参==>实参能够随便传入 g = cls(*args, **kwargs) g.start() return g ''' # 咱们能够用threading.current_thread().getName()来查看每一个g1和g2,查看的结果为DummyThread-n,即假线程,虚拟线程,其实都在一个线程里面 # 进程线程的任务切换是由操做系统自行切换的,你本身不能控制 # 协程是经过本身的程序(代码)来进行切换的,本身可以控制,只有遇到协程模块可以识别的IO操做的时候,程序才会进行任务切换,实现并发效果,若是全部程序都没有IO操做,那么就基本属于串行执行了。
from gevent import spawn,joinall,monkey;monkey.patch_all() import time def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(10): task(i) def asynchronous(): g_l=[spawn(task,i) for i in range(10)] joinall(g_l) if __name__ == '__main__': print('Synchronous:') synchronous() print('Asynchronous:') asynchronous() #上面程序的重要部分是将task函数封装到greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行全部给定的greenlet。执行流程只会在 全部greenlet执行完后才会继续向下走。 """ # 结果: Synchronous:同步,一个一个的打印 Task 0 done Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done Asynchronous:异步,一块儿打印 Task 0 done Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done """
1,线程的其余方法
Threading.current_thread() #当前线程对象
GetName() 获取线程名
Ident 获取线程id
Threading.Enumerate() #当前正在运行的线程对象的一个列表
Threading.active_count() #当前正在运行的线程数量
2,线程队列(重点)
Import queue
先进先出队列:queue.Queue(3)
先进后出\后进先出队列:queue.LifoQueue(3)
优先级队列:queue.priorityQueue(3)
Put的数据是一个元组,元组的第一个参数是优先级数字,数字越小优先级越高,越先被get到被取出来,第二个参数是put进去的值,若是说优先级相同,那么值别忘了应该是相同的数据类型,字典不行
From concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
P = ThreadPoolExecutor(4) #默认的线程个数是cpu个数 * 5
P = ProcessPoolExecutor(4) #默认的进程个数是cpu个数
P.map(f1,可迭代的对象) #异步执行
Def f1(n1,n2):
Print(n1,n2)
P.submit(f1,11,12) #异步提交任务
Res = P.submit(f1,11,12)
Res.result() #和get方法同样,若是没有结果,会等待,阻塞程序
Shutdown() #close+join,锁定线程池,等待线程池中全部已经提交的任务所有执行完毕
今日做业
明天默写: