协程介绍 python
协程:是单线程下的并发,又称微线程,是一种用户态的轻量级线程。自己并不存在,是由程序员创造的。git
须要强调的是:程序员
1. python的线程属于内核级别的,即由操做系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其余线程运行) 2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操做系统)控制切换,以此来提高效率(非io操做的切换与效率无关)
优势:
1,协程的切换开销更小,属于程序级别的切换,操做系统感知不到,于是更加轻量级;
2,单线程内就能够实现并发效果,最大限度利用cpu
缺点:
1,协程的本质是在单线程下,没法利用多核,能够一个程序开启多个进程,一个进程开启多个线程,每一个线程内开启协程。
2,协程指的是单个线程,于是一旦协程出现阻塞,就会阻塞整个线程。
greenlet
import greenlet def f1(): print(11) gr2.switch() print(22) gr2.switch() def f2(): print(33) gr1.switch() print(44) # 协程 gr1 gr1 = greenlet.greenlet(f1) # 协程 gr2 gr2 = greenlet.greenlet(f2) gr1.switch()
单纯的切换(在没有io的状况或者没有重复开辟内存空间的操做),反而会下降程序的执行速度.github
gevent
from gevent import monkey;monkey.patch_all() import gevent import time import threading def eat(): print(threading.current_thread().getName()) print(11) time.sleep(1) print(22) def play(): print(threading.current_thread().getName()) print(33) time.sleep(1) print(44) g1=gevent.spawn(eat) g2=gevent.spawn(play) gevent.joinall([g1,g2])
注意: from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块以前.web
from gevent import spawn, joinall, monkey; monkey.patch_all() import time def task(pid): time.sleep(0.5) print('Task %s done' % pid) def f1(): # 同步 for i in range(10): task(i,) def f2(): # 异步 g = [spawn(task, i) for i in range(10)] joinall(g) if __name__ == '__main__': print('f1') f1() print('f2') f2() print('DONE')
协程应用:windows
from gevent import monkey monkey.patch_all() # 之后代码中遇到IO都会自动执行greenlet的switch进行切换 import requests import gevent def get_page1(url): ret = requests.get(url) print(url,ret.content) def get_page2(url): ret = requests.get(url) print(url,ret.content) def get_page3(url): ret = requests.get(url) print(url,ret.content) gevent.joinall([ gevent.spawn(get_page1, 'https://www.python.org/'), # 协程1 gevent.spawn(get_page2, 'https://www.yahoo.com/'), # 协程2 gevent.spawn(get_page3, 'https://github.com/'), # 协程3 ])
IO多路复用 数组
I/O多路复用是指单个进程能够同时监听多个网络的链接IO,用于提高效率.网络
I/O(input/output),经过一种机制,能够监视多个文件描述,一旦描述符就绪(读就绪和写就绪),能通知程序进行相应的读写操做。本来为多进程或多线程来接收多个链接的消息变为单进程或单线程保存多个socket的状态后轮询处理。多线程
非阻塞实例:并发
import socket client = socket.socket() client.setblocking(False) # 将原来阻塞的位置变成非阻塞(报错) # 百度建立链接: 阻塞 try: client.connect(('www.baidu.com',80)) # 执行了但报错了 except BlockingIOError as e: pass # 检测到已经链接成功 # 问百度我要什么? client.sendall(b'GET /s?wd=fanbingbing HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') # 我等着接收百度给个人回复 chunk_list = [] while True: chunk = client.recv(8096) # 将原来阻塞的位置变成非阻塞(报错) if not chunk: break chunk_list.append(chunk) body = b''.join(chunk_list) print(body.decode('utf-8'))
可是非阻塞IO模型毫不被推荐。
咱们不可否则其优势:可以在等待任务完成的时间里干其余活了(包括提交其余任务,也就是 “后台” 能够有多个任务在“”同时“”执行)。
可是也难掩其缺点:
1. 循环调用recv()将大幅度推高CPU占用率,在低配主机下极容易出现卡机状况 2. 任务完成的响应延迟增大了,由于每过一段时间才去轮询一次read操做,而任务可能在两次轮询之间的任意时间完成。这会致使总体数据吞吐量的下降。
select
select是经过系统调用来监视一组由多个文件描述符组成的数组,经过调用select(),就绪的文件描述符会被内核标记出来,而后进程就能够得到这些文件描述符,进行相应的读写操做.
执行过程:
1,select须要提供要监控的数组,而后由用户态拷贝到内核态
2,内核态线性循环监控数组,每次都须要遍历整个数组
3,内核发现文件状态符符合操做结果将其返回
注意:对于要监控的socket都要设置为非阻塞的
python中使用select
r,w,e=select.selct(rlist,wlist,errlist,[timeout])
rlist,wlist,errlist均是waitable object;都是文件描述符,就是一个整数,或者拥有一个返回文件描述符的函数fileno的对象.
rlist:等待读就绪的文件描述符数组
wlist:等待写就绪的文件描述符数组
errlist:等待异常的数组
当rlist数组中的文件描述符发生可读时,(调用accept或者read函数),则获取文件描述符并添加到r数组中.
当wlist数组中的文件描述符发生可写时,则获取文件描述符添加到w数组中
当errlist数组中的的文件描述符发生错误时,将会添加到e队列中.
select的实例:
import socket import select client1 = socket.socket() client1.setblocking(False) # 百度建立链接: 非阻塞 try: client1.connect(('www.baidu.com',80)) except BlockingIOError as e: pass client2 = socket.socket() client2.setblocking(False) # 百度建立链接: 非阻塞 try: client2.connect(('www.sogou.com',80)) except BlockingIOError as e: pass socket_list = [client1,client2] conn_list = [client1,client2] while True: rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005) # wlist中表示已经链接成功的socket对象 for sk in wlist: if sk == client1: sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') elif sk==client2: sk.sendall(b'GET /web?query=fdf HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n') for sk in rlist: chunk_list = [] while True: try: chunk = sk.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b''.join(chunk_list) # print(body.decode('utf-8')) print(body) sk.close() socket_list.remove(sk) if not socket_list: break
import socket import select class Req(object): def __init__(self,sk,func): self.sock = sk self.func = func def fileno(self): return self.sock.fileno() class Nb(object): def __init__(self): self.conn_list = [] self.socket_list = [] def add(self,url,func): client = socket.socket() client.setblocking(False) # 非阻塞 try: client.connect((url, 80)) except BlockingIOError as e: pass obj = Req(client,func) self.conn_list.append(obj) self.socket_list.append(obj) def run(self): while True: rlist,wlist,elist = select.select(self.socket_list,self.conn_list,[],0.005) # wlist中表示已经链接成功的req对象 for sk in wlist: # 发生变换的req对象 sk.sock.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') self.conn_list.remove(sk) for sk in rlist: chunk_list = [] while True: try: chunk = sk.sock.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b''.join(chunk_list) # print(body.decode('utf-8')) sk.func(body) sk.sock.close() self.socket_list.remove(sk) if not self.socket_list: break def baidu_repsonse(body): print('百度下载结果:',body) def sogou_repsonse(body): print('搜狗下载结果:', body) def google_repsonse(body): print('谷歌下载结果:', body) t1 = Nb() t1.add('www.baidu.com',baidu_repsonse) t1.add('www.sogou.com',sogou_repsonse) t1.add('www.google.com',google_repsonse) t1.run()
select优势:能够跨平台使用。
缺点:1,每次调用select,都须要把fd集合由用户态拷贝到内核态,在fd多的时候开销会很大。
2,每次select都是线性遍历整个整个列表,在fd很大的时候遍历开销也很大。
操做系统检测socket是否发生变化,有三种模式(后二者在windows上不支持):
select:最多1024个socket;循环去检测。
poll:不限制监听socket个数;循环去检测(水平触发)。
epoll:不限制监听socket个数;回调方式(边缘触发)。