day34 线程池 协程

线程的其余方法

Thread实例对象的方法
  # isAlive(): 返回线程是不是活动的。
  # getName(): 返回线程名。
  # setName(): 设置线程名。

threading模块提供的一些方法:
  # threading.currentThread(): 返回当前的线程变量对象。
  # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
其余方法
import threading
import time
from threading import Thread,current_thread

def f1(n):
    time.sleep(1)
    print('子线程对象', current_thread())  # <Thread(Thread-1, started 123145336967168)>
    print('子线程名称', current_thread().getName())  # 当前线程对象 Thread-1
    print('子线程ID', current_thread().ident)  # 123145336967168
    print('%s号线程任务'%n)


if __name__ == '__main__':
    t1 = Thread(target=f1,args=(1,))
    t1.start()

    print('主线程对象',current_thread())  # <_MainThread(MainThread, started 140734833878464)>
    print('主线程名称',current_thread().getName())  # 当前线程对象(是主线程对象) MainThread
    print('主线程ID',current_thread().ident)  # 当前线程ID 140734833878464

    print(threading.enumerate()) # [<_MainThread(MainThread, started 140734833878464)>, <Thread(Thread-1, started 123145336967168)>]
    print(threading.active_count())  # 2

"""
结果:
主线程对象 <_MainThread(MainThread, started 140734833878464)>
主线程名称 MainThread
主线程ID 140734833878464
[<_MainThread(MainThread, started 140734833878464)>, <Thread(Thread-1, started 123145336967168)>]
2
子线程对象 <Thread(Thread-1, started 123145336967168)>
子线程名称 Thread-1
子线程ID 123145336967168
1号线程任务


# 小结:
                                      
threading.current_thread()  <==等效于==> Thread(target=f1)
#这两个等效的前提是: 左边  的位置要跟 右边target(目标函数)所在位置 同样,即左边的是获取当前位置的线程变量对象,右边的是在target(目标函数)所在位置建立线程对象.
"""
栗子

 

线程队列 (重点)

线程队列,不须要从threading模块里面导入,直接import queue就能够了,这是python自带的python

queue队列 :使用import queue,用法与进程队列 multiprocessing.Queue 同样,也有如下方法:nginx

# put,put_nowait,get,get_nowait,full,empty,qsize
q = Queue(5)  # 5是size
q.put()  #放入数据,满了会等待(阻塞)
q.get()  #获取数据,没有数据了会等待(阻塞)

q.qsize()  # 当前放进去的元素的个数

q.empty()  # 是否为空,不可靠(由于多线程)
q.full() # 是否满了,不可靠(由于多线程)

q.put_nowait()   #添加数据,不等待,可是队列满了报错
q.get_nowait()  #获取数据,不等待,可是队列空了报错

class queue.Queue(maxsize=0) #先进先出(FIFO: fisrt in fisrt out)数组

import queue  # 线程中的队列使用的是这个,等效于进程中的队列  put,put_nowait,get,get_nowait,full,empty
q = queue.Queue(4)  # FIFO先进先出  first in first out


q.put(1)
q.put(2)
print(q.full())  # 不满
# print('当前队列内容的长度',q.qsize())
q.put(3)
print(q.full())  #
# q.put(4)  # 不报错,会阻塞

print(q.qsize())

try:
    q.put_nowait(4)  # 报错queue.Full
except Exception:
    print('queue full')


print(q.get())
print(q.get())
print(q.empty())  # 不空
print(q.get())
print(q.empty())  #
print(q.get())  # 不报错,会阻塞

try:
    print(q.get_nowait())  # 报错queue.Empty

except Exception:
    print('queue empty')
先进先出队列

class queue.LifoQueue(maxsize=0) #先进后出队列(或者后进先出(LIFO: last in fisrt out)),相似于栈网络

q = queue.LifoQueue(3)  # Lifo
q.put(1)
q.put(2)
print(q.full())  # 不满
# print('当前队列内容的长度',q.qsize())
q.put(3)
print(q.full())  #
# q.put(4)  # 不报错,会阻塞

print(q.qsize())

try:
    q.put_nowait(4)  # 报错queue.Full
except Exception:
    print('queue full')


print(q.get())
print(q.get())
print(q.empty())  # 不空
print(q.get())
print(q.empty())  #
print(q.get())  # 不报错,会阻塞

try:
    print(q.get_nowait())  # 报错queue.Empty

except Exception:
    print('queue empty')
后进先出队列

class queue.PriorityQueue(maxsize=0) #优先级的队列(存储数据时可设置优先级)多线程

# 优先级队列 PriorityQueue

# put的数据是一个元组,元组的第一个参数是优先级数字(一般是数字,也能够是非数字之间的比较),数字越小优先级越高,越先被get拿到被取出来,第二个参数是put进去的值(能够是任意的数据类型)
# 若是说优先级(第一个参数)相同,那么比较值(第二个参数),值必须是相同的数据类型(不包括字典),不然报错
# 比较第二个参数:
# 若是第二个参数(或者其参数的元素)是数字: 数字==直接拿总体的数字==>比较大小,
# 若是第二个参数(或者其参数的元素)是字符串:字符串=依次取到每一个字符=>比较每一个字符的ASCII码.
q = queue.PriorityQueue(10)

q.put((-5, 'alex'))  # 放入元组,第一个元素是优先级(能够为负数,越小,优先级越高),第二个是真正的数据(数据类型随意)
q.put((2, 'blex'))
q.put((3, 'clex'))
q.put((3, '111'))

print(q.get())
print(q.get())
print(q.get())
print(q.get())
print('=======================')



q.put(('x', 123))
q.put(('y', 345))

print(q.get())
print(q.get())
print('=======================')

"""
('x', 123)
('y', 345)
"""


q.put((5, 'alex'))  # 放入元组,第一个元素是优先级(能够为负数,越小,优先级越高),第二个是真正的数据(数据类型随意)
q.put((2, 1))
q.put((3, (1,)))
# q.put((7, {1,2}))  # 优先级相同数据类型不一样,报错TypeError: '<' not supported between instances of 'dict' and 'set'
q.put((7, {1:2}))
q.put((7, {1:'a'}))  # 优先级相同数据类型都是字典,报错TypeError: '<' not supported between instances of 'dict' and 'dict'


print(q.get())
print(q.get())
print(q.get())
print(q.get())

print('=======================')
优先级队列

  

线程池(重点)

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

统一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式同样,并且只要经过这个concurrent.futures导入就能够直接用他们两个了并发

concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.
二者实现相同的接口,该接口由抽象Executor类定义。

#2 基本方法
#submit(fn, *args, **kwargs)
异步提交任务(万能传参,传入的实参能够是任意数据类型,注意fn的形参数量要和这里的实参数量对应)

#map(func, *iterables, timeout=None, chunksize=1) 
取代for循环submit的操做(参数1是函数,参数2是可迭代对象)

#shutdown(wait=True)  ==>close()+join()
至关于进程池的multiprocessing.Pool().close()+multiprocessing.Pool().join()操做
wait=True,等待池内全部任务执行完毕回收完资源后才继续
wait=False,当即返回,并不会等待池内的任务执行完毕
但无论wait参数为什么值,整个程序都会等到全部任务执行完毕
submit和map必须在shutdown以前

#result(timeout=None)
取得结果(至关于pool.get())

#add_done_callback(fn)
回调函数(功能相似于pool的callback,可是显然用法不一样)
"""
multiprocessing.Pool和concurrent.futures.ThreadPoolExecutor,ProcessPoolExecutor中回调函数的区别:

进程的回调函数res = pool.apply_async(f1,args=(5,),callback=call_back_func)
(这里的callback是默认的关键字,call_back_func是自定义的回调函数名)==>做为异步对象的参数调用
线程的回调函数res = tp.submit(f1,11,12).add_done_callback(f2)
(这里的add_done_callback是默认的回调函数名,f2是自定义的回调函数)==>做为异步对象的方法调用)
"""
concurrent.futures方法

上栗子:app

import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    print('%s打印的:'%(threading.get_ident()),n)
    return n*n
tpool = ThreadPoolExecutor(max_workers=5) #默认通常起线程的数据不超过CPU个数*5
# tpool = ProcessPoolExecutor(max_workers=5) #进程池的使用只须要将上面的ThreadPoolExecutor改成ProcessPoolExecutor就好了,其余都不用改
#异步执行
t_lst = []
for i in range(5):
    t = tpool.submit(func,i) #提交执行函数,返回一个结果对象,i做为任务函数的参数 def submit(self, fn, *args, **kwargs):  能够传任意形式的参数
    t_lst.append(t)  #
    # print(t.result())
    #这个返回的结果对象t,不能直接去拿结果,否则又变成串行了,能够理解为拿到一个号码,等全部线程的结果都出来以后,咱们再去经过结果对象t获取结果
tpool.shutdown() #起到原来的close阻止新任务进来 + join的做用,等待全部的线程执行完毕
print('主线程')
for ti in t_lst:
    print('>>>>',ti.result())

# 咱们还能够不用shutdown(),用下面这种方式
# while 1:
#     for n,ti in enumerate(t_lst):
#         print('>>>>', ti.result(),n)
#     time.sleep(2) #每一个两秒去去一次结果,哪一个有结果了,就能够取出哪个,想表达的意思就是说不用等到全部的结果都出来再去取,能够轮询着去取结果,由于你的任务须要执行的时间很长,那么你须要等好久才能拿到结果,经过这样的方式能够将快速出来的结果先拿出来。若是有的结果对象里面尚未执行结果,那么你什么也取不到,这一点要注意,不是空的,是什么也取不到,那怎么判断我已经取出了哪个的结果,能够经过枚举enumerate来搞,记录你是哪个位置的结果对象的结果已经被取过了,取过的就再也不取了

#结果分析: 打印的结果是没有顺序的,由于到了func函数中的sleep的时候线程会切换,谁先打印就没准儿了,可是最后的咱们经过结果对象取结果的时候拿到的是有序的,由于咱们主线程进行for循环的时候,咱们是按顺序将结果对象添加到列表中的。
# 37220打印的: 0
# 32292打印的: 4
# 33444打印的: 1
# 30068打印的: 2
# 29884打印的: 3
# 主线程
# >>>> 0
# >>>> 1
# >>>> 4
# >>>> 9
# >>>> 16
栗子
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
def f1(n,s):  # 要与 万能传参 的参数数量一致
    time.sleep(1)
    # print(n,s)
    return n * n

if __name__ == '__main__':
    tp = ThreadPoolExecutor(4) # 线程  默认的线程个数是cpu个数 * 5
    # tp = ProcessPoolExecutor(4) # 进程  默认的进程个数是cpu个数 这两个的方法一致
    # tp.map(f1, range(10))  # 异步提交任务,参数是(任务名,可迭代对象)
    res_lis = []
    for i in range(10):
        res = tp.submit(f1,i,'baobao')  # submit是给线程池异步提交任务,万能传参
        # print(res)  # <Future at 0x10617a208 state=running>

        res_lis.append(res)

    for t in res_lis:  # 4个4个的打印
        print(t.result())

    tp.shutdown()  # ==等效于==> close + join 主线程等待全部提交给线程池的任务所有执行完毕

    for t in res_lis:  # 所有一块儿打印
        print(t.result())  # 结果对象.result,#和get方法同样,若是没有结果,会等待,阻塞程序

    print('主线程')

"""
只须要将这一行代码改成下面这一行就能够了,其余的代码都不用变
tpool = ThreadPoolExecutor(max_workers=5) #默认通常起线程的数据不超过CPU个数*5
# tpool = ProcessPoolExecutor(max_workers=5)#默认通常起线程的数据不超过CPU个数

你就会发现为何将线程池和进程池都放到这一个模块里面了,由于用法同样,因此不论是线程池仍是进程池,更推荐使用这个from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
"""
ThreadPoolExecutor的简单使用
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    return n*n

def call_back(m):
    print('结果为:%s'%(m.result()))  # 注意回调函数拿到的是线程(进程)对象,想要拿到值须要调用result方法

tpool = ThreadPoolExecutor(max_workers=5)
t_lst = []
for i in range(5):
    t = tpool.submit(func,i).add_done_callback(call_back)
    
"""
结果为:0
结果为:1
结果为:4
结果为:9
结果为:16

"""
回调函数简单应用

 

 

协程 

协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序本身控制调度的。负载均衡

须要强调的是:异步

#1. python的线程属于内核级别的,即由操做系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其余线程运行)
#2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操做系统)控制切换,以此来提高效率(!!!非io操做的切换与效率无关)

操做系统控制线程的切换 <==对比==> 用户在单线程内控制协程的切换socket

#1. 协程的切换开销更小,属于程序级别的切换,操做系统彻底感知不到,于是更加轻量级
#2. 单线程内就能够实现并发的效果,最大限度地利用cpu
协程优势
#1. 协程的本质是单线程下,没法利用多核,能够是一个程序开启多个进程,每一个进程内开启多个线程,每一个线程内开启协程
#2. 协程指的是单个线程,于是一旦协程出现阻塞,将会阻塞整个线程
协程缺点
# 1.必须在只有一个单线程里实现并发
# 2.修改共享数据不需加锁
# 3.用户程序里本身保存多个控制流的上下文栈
# 4.附加:一个协程遇到IO操做自动切换到其它协程(如何实现检测IO,yield、greenlet都没法实现,就用到了gevent模块(select机制))
协程特色总结

协程就是告诉Cpython解释器,你不是nb吗,不是搞了个GIL锁吗,那好,我就本身搞成一个线程让你去执行,省去你切换线程的时间,我本身切换比你切换要快不少,避免了不少的开销,对于单线程下,咱们不可避免程序中出现io操做,但若是咱们能在本身的程序中(即用户程序级别,而非操做系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另一个任务去计算,这样就保证了该线程可以最大限度地处于就绪态,即随时均可以被cpu执行的状态,至关于咱们在用户程序级别将本身的io操做最大限度地隐藏起来,从而能够迷惑操做系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给咱们的线程。

协程的本质就是在单线程下,由用户本身控制一个任务遇到io阻塞了就切换另一个任务去执行,以此来提高效率。为了实现它,咱们须要找寻一种能够同时知足如下条件的解决方案:

 

#1. 能够控制多个任务之间的切换,切换以前将任务的状态保存下来,以便从新运行时,能够基于暂停的位置继续执行。

#2. 做为1的补充:能够检测io操做,在遇到io操做的状况下才发生切换

  

生成器 

并发的本质:任务切换+保存状态,yield自己就是一种在单线程下能够保存任务运行状态的方法,

#1 yield能够保存状态,yield的状态保存与操做系统的保存线程状态很像,可是yield是代码级别控制的,更轻量级
#2 send能够把一个函数的结果传给另一个函数,以此实现单线程内程序之间的切换  
import time
#基于yield并发执行,多任务之间来回切换,这就是个简单的协程的体现,可是他可以节省I/O时间吗?不能,yield不能检测IO,不能实现遇到IO自动切换
def f1():
    for i in range(3):
        time.sleep(0.5)  # 发现什么?只是进行了切换,可是并无节省I/O时间
        print('f1>>',i)
        # yield

def f2():
    # g = f1()
    for i in range(3):
        time.sleep(0.5)
        print('f2>>', i)
        # next(g)

#不写yield,下面两个任务是执行完func1里面全部的程序才会执行func2里面的程序,有了yield,咱们实现了两个任务的切换+保存状态
#基于yield保存状态,实现两个任务直接来回切换,即并发的效果
#PS:若是每一个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
f1()
f2()

"""
f1>> 0
f1>> 1
f1>> 2
f2>> 0
f2>> 1
f2>> 2

有了yield:
f2>> 0
f1>> 0
f2>> 1
f1>> 1
f2>> 2
f1>> 2
生成器版协程

 

greenlet模块 

#安装==>在终端输入如下代码
pip3 install greenlet

 

import time
from greenlet import greenlet


# 真正的协程模块就是使用greenlet完成的切换
def f1(s):
    print('第一次f1==>'+s)
    g2.switch('taibai')  #切换到g2这个对象的任务去执行
    time.sleep(1)
    print('第一次f1==>'+s)
    g2.switch()
def f2(s):
    print('第一次f2==>'+s)
    g1.switch()
    time.sleep(1)
    print('第二次f2==>'+s)
g1 = greenlet(f1)  #实例化一个greenlet对象,并将任务名称做为参数传进去
g2 = greenlet(f2)
g1.switch('alex') #执行g1对象里面的任务,能够在第一次switch时传入参数,之后都不须要

"""
greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时若是遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提高效率的问题。
"""
greenlet版协程

通常在工做中咱们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果,若是是4核的cpu,通常起5个进程,每一个进程中20个线程(5倍cpu数量),每一个线程能够起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,咱们就能够用协程去实现并发。 并发数量 = 5 * 20 * 500 = 50000个并发,这是通常一个4cpu的机器最大的并发数。nginx在负载均衡的时候最大承载量就是5w个。 

 

gevent模块(重点)

#安装==>在终端输入如下代码
pip3 install gevent
from gevent import monkey;monkey.patch_all()  # 必须写在最上面,这句话后面的全部阻塞所有可以识别了
import gevent
import time
import threading

# 遇到IO阻塞时会自动切换任务
def f1(name):
    print(f'{name}==第一次f1')
    print(threading.current_thread().getName())  # DummyThread-1 假线程,虚拟线程
    # gevent.sleep(1)  # gevent默承认以识别的io阻塞
    time.sleep(2)  # 加上mokey就可以识别到time模块的sleep了
    print(f'{name}==第二次f1')
    return name

def f2(name):
    print(threading.current_thread().getName())  # DummyThread-2
    print(f'{name}==第一次f2')
    # gevent.sleep(2)
    time.sleep(2)  # 来回切换,直到一个I/O的时间结束,这里都是咱们的gevent作得,再也不是控制不了的操做系统了。
    print(f'{name}==第二次f2')

s = time.time()

g1 = gevent.spawn(f1,'alex') #异步提交了f1任务
g2 = gevent.spawn(f2,name='egon') #建立一个协程对象g2,spawn括号内第一个参数是函数名,如f2,后面能够有多个参数,能够是位置实参或关键字实参,都是传给函数f2的,spawn是异步提交任务
# g1.join()  # 等待g1结束,上面只是建立协程对象,这个join才是去执行
# g2.join()  # 等待g2结束  有人测试的时候会发现,不写第二个join也能执行g2,是的,协程帮你切换执行了,可是你会发现,若是g2里面的任务执行的时间长,可是不写join的话,就不会执行完等到g2剩下的任务了

gevent.joinall([g1,g2])  # 这里等价于上述join两步合做一步
print(g1.value)#拿到func1的返回值

e = time.time()
print('执行时间:',e-s)  # 测试执行时间
print('主程序任务')

"""
结果:
alex==第一次f1
DummyThread-1
DummyThread-2
egon==第一次f2
alex==第二次f1
egon==第二次f2
alex
执行时间: 2.004991054534912
主程序任务

"""

'''
# spawn是类方法,参数是万能的
@classmethod
    def spawn(cls, *args, **kwargs):  # 万能形参==>实参能够随便传入
    g = cls(*args, **kwargs)
        g.start()
        return g
'''

# 咱们能够用threading.current_thread().getName()来查看每一个g1和g2,查看的结果为DummyThread-n,即假线程,虚拟线程,其实都在一个线程里面
# 进程线程的任务切换是由操做系统自行切换的,你本身不能控制
# 协程是经过本身的程序(代码)来进行切换的,本身可以控制,只有遇到协程模块可以识别的IO操做的时候,程序才会进行任务切换,实现并发效果,若是全部程序都没有IO操做,那么就基本属于串行执行了。
gevent版协程
from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


def synchronous():
    for i in range(10):
        task(i)

def asynchronous():
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)

if __name__ == '__main__':
    print('Synchronous:')
    synchronous()

    print('Asynchronous:')
    asynchronous()
#上面程序的重要部分是将task函数封装到greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行全部给定的greenlet。执行流程只会在 全部greenlet执行完后才会继续向下走。


"""
# 结果:
Synchronous:同步,一个一个的打印
Task 0 done
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Asynchronous:异步,一块儿打印
Task 0 done
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
"""
协程:同步异步对比

 

 

 

今日内容回顾:

1,线程的其余方法

Threading.current_thread() #当前线程对象

GetName() 获取线程名

Ident  获取线程id

 

Threading.Enumerate() #当前正在运行的线程对象的一个列表

Threading.active_count() #当前正在运行的线程数量

 

2,线程队列(重点)

Import queue

先进先出队列:queue.Queue(3)

先进后出\后进先出队列:queue.LifoQueue(3)  

优先级队列:queue.priorityQueue(3)

    Put的数据是一个元组,元组的第一个参数是优先级数字,数字越小优先级越高,越先被get到被取出来,第二个参数是put进去的值,若是说优先级相同,那么值别忘了应该是相同的数据类型,字典不行

 

3,线程池

From concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

 

P = ThreadPoolExecutor(4)  #默认的线程个数是cpu个数 * 5

P = ProcessPoolExecutor(4)  #默认的进程个数是cpu个数

P.map(f1,可迭代的对象)  #异步执行

Def f1(n1,n2):

Print(n1,n2)

P.submit(f1,11,12)  #异步提交任务

Res = P.submit(f1,11,12)

 

Res.result()  #get方法同样,若是没有结果,会等待,阻塞程序

 

Shutdown() #close+join,锁定线程池,等待线程池中全部已经提交的任务所有执行完毕

 

 

今日做业

 

  1. 多线程实现 一个socket并发聊天,就是一个服务端同时与多个客户端进行沟通
  2. 写一个简易的socketserver
  3. 经过线程池作爬虫,经过回调函数来清洗爬取回来的数据,简单写,就是将爬取回来的网页内容,经过正则来匹配一些其中的内容,匹配其中的连接网址

 

  

明天默写:

  1. 线程池的方法
  2. Gevent模块的写法
相关文章
相关标签/搜索