协程,又称微线程,纤程。英文名Coroutine。html
线程是系统级别的它们由操做系统调度,而协程则是程序级别的由程序根据须要本身调度。在一个线程中会有不少函数,咱们把这些函数称为子程序,在子程序执行过程当中能够中断去执行别的子程序,而别的子程序也能够中断回来继续执行以前的子程序,这个过程就称为协程。也就是说在同一线程内一段代码在执行过程当中会中断而后跳转执行别的代码,接着在以前中断的地方继续开始执行,相似与yield操做。python
协程拥有本身的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。所以:协程能保留上一次调用时的状态(即全部局部状态的一个特定组合),每次过程重入时,就至关于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。linux
协程的优势:nginx
(1)无需线程上下文切换的开销,协程避免了无心义的调度,由此能够提升性能(但也所以,程序员必须本身承担调度的责任,同时,协程也失去了标准线程使用多CPU的能力)程序员
(2)无需原子操做锁定及同步的开销web
(3)方便切换控制流,简化编程模型编程
(4)高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。因此很适合用于高并发处理。windows
协程的缺点:数组
(1)没法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程须要和进程配合才能运行在多CPU上.固然咱们平常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。缓存
(2)进行阻塞(Blocking)操做(如IO时)会阻塞掉整个程序
(1)yield实现协程效果
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/24 10:46 # @Author : Py.qi # @File : yield_xiecheng.py # @Software: PyCharm def consumer(name): print('开始吃包子...') while True: print('\033[31;1m[consumer]%s须要包子\033[0m'%name) bone = yield #接收send发送的数据 print('\033[31;1m[%s]吃了%s个包子\033[0m'%(name,bone)) def producer(obj1): obj1.send(None) #必须先发送None for i in range(3): print('\033[32;1m[producer]\033[0m正在作%s个包子'%i) obj1.send(i) if __name__ == '__main__': con1 = consumer('消费者A') #建立消费者对象 producer(con1) #output: 开始吃包子... [consumer]消费者A须要包子 [producer]正在作0个包子 [消费者A]吃了0个包子 [consumer]消费者A须要包子 [producer]正在作1个包子 [消费者A]吃了1个包子 [consumer]消费者A须要包子 [producer]正在作2个包子 [消费者A]吃了2个包子 [consumer]消费者A须要包子
(2)greenlet模块实现程序间切换执行
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/24 15:25 # @Author : Py.qi # @File : greenlet_now.py # @Software: PyCharm import greenlet def A(): print('a.....') g2.switch() #切换至B print('a....2') g2.switch() def B(): print('b.....') g1.switch() #切换至A print('b....2') g1 = greenlet.greenlet(A) #启动一个线程 g2 = greenlet.greenlet(B) g1.switch()
(3)gevent实现协程
Gevent 是一个第三方库,能够轻松经过gevent实现协程程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet所有运行在主程序操做系统进程的内部,但它们被协做式地调度。
gevent会主动识别程序内部的IO操做,当子程序遇到IO后,切换到别的子程序。若是全部的子程序都进入IO,则阻塞。
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/24 15:59 # @Author : Py.qi # @File : gevent_noe.py # @Software: PyCharm import gevent def foo(): print('running in foo') gevent.sleep(2) print('com back from bar in to foo') def bar(): print('running in bar') gevent.sleep(2) print('com back from foo in to bar') gevent.joinall([ #建立线程并行执行程序,碰到IO就切换 gevent.spawn(foo), gevent.spawn(bar), ])
线程函数同步与异步比较:
import gevent def task(pid): gevent.sleep(1) print('task %s done'%pid) def synchronous(): #同步一个线程执行函数 for i in range(1,10): task(i) def asynchronous(): #异步一个线程执行函数 threads = [gevent.spawn(task,i) for i in range(10)] gevent.joinall(threads) print('synchronous:') synchronous() #同步执行时要等待执行完后再执行 print('asynchronous:') asynchronous() #异步时遇到等待则会切换执行
爬虫异步IO阻塞切换:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/24 17:00 # @Author : Py.qi # @File : gevent_urllib.py # @Software: PyCharm from urllib import request import gevent,time from gevent import monkey monkey.patch_all() #将程序中全部IO操做作上标记使程序非阻塞状态 def url_request(url): print('get:%s'%url) resp = request.urlopen(url) data = resp.read() print('%s bytes received from %s'%(len(data),url)) async_time_start = time.time() #开始时间 gevent.joinall([ gevent.spawn(url_request,'https://www.python.org/'), gevent.spawn(url_request,'https://www.nginx.org/'), gevent.spawn(url_request,'https://www.ibm.com'), ]) print('haoshi:',time.time()-async_time_start) #总用时
协程实现多并发连接socket通讯:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/24 17:22 # @Author : Py.qi # @File : gevent_sock.py # @Software: PyCharm import socket,gevent from gevent import monkey monkey.patch_all() def server_sock(port): s = socket.socket() s.bind(('',port)) s.listen(10) while True: conn,addr = s.accept() gevent.spawn(handle_request,conn) def handle_request(conn): try: while True: data = conn.recv(1024) if not data: conn.shutdown(socket.SHUT_WR) print('recv:',data.decode()) conn.send(data) except Exception as ex: print(ex) finally: conn.close() if __name__ == '__main__': server_sock(8888) #!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/24 17:35 # @Author : Py.qi # @File : gevent_sockclient.py # @Software: PyCharm import socket HOST = 'localhost' # The remote host PORT = 8888 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: #msg = bytes(input(">>:"), encoding="utf8") for i in range(50): s.send('dddd'.encode()) data = s.recv(1024) # print(data) print('Received', repr(data)) s.close()
事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特色是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理,另外两种经常使用的编程范式是(单线程)同步以及多线程编程。
服务器处理模型的程序时,有如下几种模型:
在单线程同步模型中,任务按照顺序执行。若是某个任务由于I/O而阻塞,其余全部的任务都必须等待,直到它完成以后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。若是任务之间并无互相依赖的关系,但仍然须要互相等待的话这就使得程序没必要要的下降了运行速度。
在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操做系统来管理,在多处理器系统上能够并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其余线程得以继续执行。与完成相似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,由于这类程序不得不经过线程同步机制如锁、可重入函数、线程局部存储或者其余机制来处理线程安全问题,若是实现不当就会致使出现微妙且使人痛不欲生的bug。
在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其余昂贵的操做时,注册一个回调到事件循环中,而后当I/O操做完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询全部的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽量的得以执行而不须要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,由于程序员不须要关心线程安全问题。
当程序中有许多任务,且任务之间高度独立(它们不须要互相通讯,或等待彼此)并且在等待事件到来时,某些任务会阻塞时事件驱动模型时个很好的选择;当应用程序须要在任务间共享可变的数据时,事件驱动模式能够更好的在单线程下处理。
网络应用程序一般都是上述特色,这使得它们可以很好的契合事件驱动编程模型。
此处要提出一个问题,就是,上面的事件驱动模型中,只要一遇到IO就注册一个事件,而后主程序就能够继续干其它的事情了,只到io处理完毕后,继续恢复以前中断的任务,这本质上是怎么实现的呢?这就涉及到select\poll\epoll异步IO
同步IO和异步IO,阻塞IO和非阻塞IO分别是什么,到底有什么区别?
不一样的人在不一样的上下文下给出的答案是不一样的。因此先限定一下本文的上下文。
本文讨论的背景是Linux环境下的network IO。
在进行解释以前,首先要说明几个概念:
进程切换
进程的阻塞
文件描述符
缓存 I/O
进程切换
为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复之前挂起的某个进程的执行。这种行为被称为进程切换。
所以能够说,任何进程都是在操做系统内核的支持下运行的,是与内核紧密相关的。
从一个进程的运行转到另外一个进程上运行,这个过程当中通过下面这些变化:
1. 保存处理器上下文,包括程序计数器和其余寄存器
2. 更新PCB信息
3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列
4. 选择另外一个进程执行,并更新其PCB
5. 更新内存管理的数据结构
6. 恢复处理器上下文
进程控制块PCB(Processing Control Block),是操做系统核心中一种数据结构,主要表示进程状态。
PCB的做用是使一个在多道程序环境下不能独立运行的程序(含数据),成为一个能独立运行的基本单位或与其它进程并发执行的进程。或者说,OS是根据PCB来对并发执行的进程进行控制和管理的。 PCB一般是系统内存占用区中的一个连续存区,它存放着操做系统用于描述进程状况及控制进程运行所需的所有信息
进程的阻塞
正在执行的进程,因为期待的某些事件未发生,如请求系统资源失败、等待某种操做的完成、新数据还没有到达或无新工做作等,则由系统自动执行阻塞原语(Block),使本身由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也所以只有处于运行态的进程(得到CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。
文件描述符fd
文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每个进程所维护的该进程打开文件的记录表。
当程序打开一个现有文件或者建立一个新文件时,内核向进程返回一个文件描述符。
在程序设计中,一些涉及底层的程序编写每每会围绕着文件描述符展开。可是文件描述符这一律念每每只适用于UNIX、Linux这样的操做系统。
缓存 I/O
缓存 I/O 又被称做标准 I/O,大多数文件系统的默认 I/O 操做都是缓存 I/O。
在 Linux 的缓存 I/O 机制中,操做系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中。
数据会先被拷贝到操做系统内核的缓冲区中,而后才会从操做系统内核的缓冲区拷贝到应用程序的地址空间。
缓存 I/O 的缺点:
数据在传输过程当中须要在应用程序地址空间和内核进行屡次数据拷贝操做,这些数据拷贝操做所带来的 CPU 以及内存开销是很是大的。
对于一次IO访问(以read举例),数据会先被拷贝到操做系统内核的缓冲区中,而后才会从操做系统内核的缓冲区拷贝到应用程序的地址空间。
一个IO(如read)操做会经历如下两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
由于有了这两个阶段,linux系统产生了下面五种网络模式的方案。
1.阻塞 I/O(blocking IO)
2.非阻塞 I/O(nonblocking IO)
3.I/O 多路复用( IO multiplexing)
4.信号驱动 I/O( signal driven IO)
5.异步 I/O(asynchronous IO)
阻塞 I/O(blocking IO)
在linux中,默认状况下全部的socket都是blocking,一个典型的读操做流程大概是这样:
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来讲,不少时候数据在一开始尚未到达。好比,尚未收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程须要等待,也就是说数据被拷贝到操做系统内核的缓冲区中是须要一个过程的。而在用户进程这边,整个进程会被阻塞(固然,是进程本身选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,而后kernel返回结果,用户进程才解除block的状态,从新运行起来。
因此,blocking IO的特色就是在IO执行的两个阶段都被block了。
非阻塞 I/O(nonblocking IO)
linux下,可经过设置socket使其变为非阻塞IO。当对一个non-blocking socket执行读操做时,流程是这个样子:
当用户进程发出read操做时,若是kernel中的数据尚未准备好,那么它并不会block用户进程,而是马上返回一个error。
从用户进程角度讲 ,它发起一个read操做后,并不须要等待,而是立刻就获得了一个结果。用户进程判断结果是一个error时,它就知道数据尚未准备好,因而它能够再次发送read操做。一旦kernel中的数据准备好了,而且又再次收到了用户进程的system call,那么它立刻就将数据拷贝到了用户内存,而后返回。
因此,nonblocking IO的特色是用户进程须要不断的主动询问kernel数据好了没有。
I/O 多路复用( IO multiplexing)
IO multiplexing就是咱们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。
select/epoll的好处就在于单个process就能够同时处理多个网络链接的IO。
它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的全部socket
当某个socket有数据到达了,就通知用户进程。
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”全部select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操做,将数据从kernel拷贝到用户进程。
因此,I/O 多路复用的特色是经过一种机制使一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就能够返回。
IO多路复用和阻塞IO其实并无太大的不一样,事实上,还更差一些。由于这里须要使用两个system call (select 和 recvfrom),而阻塞IO只调用了一个system call (recvfrom)。可是,用select的优点在于它能够同时处理多个链接。
若是处理的链接数不是很高的话,使用select/epoll的web server不必定比使用多线程+阻塞IO的web server性能更好,可能延迟还更大。
select/epoll的优点并非对于单个链接能处理得更快,而是在于能处理更多的链接。
在IO multiplexing Model中,实际中,对于每个socket,通常都设置成为non-blocking
可是,如上图所示,整个用户的process实际上是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
异步 I/O(asynchronous IO)
用户进程发起read操做以后,马上就能够开始去作其它的事。而另外一方面,从kernel的角度,当它受到一个asynchronous read以后,首先它会马上返回,因此不会对用户进程产生任何block。
而后,kernel会等待数据准备完成,而后将数据拷贝到用户内存,当这一切都完成以后,kernel会给用户进程发送一个signal,告诉它read操做完成了。
blocking和non-blocking的区别
调用blocking IO会一直block住对应的进程直到操做完成
调用non-blocking IO在kernel还准备数据的状况下会马上返回
synchronous IO和asynchronous IO的区别
synchronous I/O操做会致使请求进程被阻塞,直到I/O操做完成;
asynchronous I/O操做不会致使请求进程被阻塞;
以前所说的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。
有人会说,non-blocking IO并无被block啊。 这里须要格外注意,定义中所指的”IO operation”是指真实的IO操做,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,若是kernel的数据没有准备好,这时候不会block进程。可是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。
而asynchronous IO则不同,当进程发起IO 操做以后,就直接返回不再理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程当中,进程彻底没有被block。
各个IO Model的比较如图所示:
经过上面的图片,能够发现non-blocking IO和asynchronous IO的区别仍是很明显的。
在non-blocking IO中,虽然进程大部分时间都不会被block,可是它仍然要求进程去主动的check,而且当数据准备完成之后,也须要进程主动的再次调用recvfrom来将数据拷贝到用户内存。
而asynchronous IO则彻底不一样。它就像是用户进程将整个IO操做交给了他人(kernel)完成,而后他人作完后发信号通知。在此期间,用户进程不须要去检查IO操做的状态,也不须要主动的去拷贝数据。
select,poll,epoll都是IO多路复用的机制。I/O多路复用就是经过一种机制使一个进程能够监视多个描述符,一旦某个描述符就绪(通常是读就绪或者写就绪),可以通知程序进行相应的读写操做。
select,poll,epoll本质上都是同步I/O,由于他们都须要在读写事件就绪后本身负责进行读写,也就是说这个读写过程是阻塞的
异步I/O则无需本身负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。
sellect、poll、epoll三者的区别 :
select:
目前支持几乎全部的平台
默认单个进程可以监视的文件描述符的数量存在最大限制,在linux上默认只支持1024个socket
能够经过修改宏定义或从新编译内核(修改系统最大支持的端口数)的方式提高这一限制
内核准备好数据后通知用户有数据了,但不告诉用户是哪一个链接有数据,用户只能经过轮询的方式来获取数据
假定select让内核监视100个socket链接,当有1个链接有数据后,内核就通知用户100个链接中有数据了
可是不告诉用户是哪一个链接有数据了,此时用户只能经过轮询的方式一个个去检查而后获取数据
这里是假定有100个socket链接,那么若是有上万个,上十万个呢?
那你就得轮询上万次,上十万次,而你所取的结果仅仅就那么1个。这样就会浪费不少没用的开销
只支持水平触发;每次调用select,都须要把fd集合从用户态拷贝到内核态,这个开销在fd不少时会很大
同时每次调用select都须要在内核遍历传递进来的全部fd,这个开销在fd不少时也会很大
poll:
与select没有本质上的差异,仅仅是没有了最大文件描述符数量的限制
只支持水平触发
只是一个过渡版本,不多用
epoll:
Linux2.6才出现的epoll,具有了select和poll的一切优势,公认为性能最好的多路IO就绪通知方法
没有最大文件描述符数量的限制
同时支持水平触发和边缘触发
不支持windows平台
内核准备好数据之后会通知用户哪一个链接有数据了
IO效率不随fd数目增长而线性降低
使用mmap加速内核与用户空间的消息传递
水平触发与边缘触发:
水平触发:将就绪的文件描述符告诉进程后,若是进程没有对其进行IO操做,那么下次调用epoll时将再次报告这些文件描述符,这种方式称为水平触发
边缘触发:只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,若是咱们没有采起行动,那么它将不会再次告知,这种方式称为边缘触发
理论上边缘触发的性能要更高一些,可是代码实现至关复杂。
select和epoll的特色:
select:
select经过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程能够得到这些文件描述符从而进行后续的读写操做。
因为网络响应时间的延迟使得大量TCP链接处于非活跃状态,但调用select()会对全部socket进行一次线性扫描,因此这也浪费了必定的开销。
epoll:
epoll一样只告知那些就绪的文件描述符,并且当咱们调用epoll_wait()得到就绪文件描述符时,返回的不是实际的描述符,而是一个表明就绪描述符数量的值,你只须要去epoll指定的一个数组中依次取得相应数量的文件描述符便可,这里也使用了内存映射(mmap)技术,这样便完全省掉了这些文件描述符在系统调用时复制的开销。
另外一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用必定的方法后,内核才对全部监视的文件描述符进行扫描,而epoll事先经过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用相似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便获得通知。
select
select(rlist,wlist,xlist,timeout=None)
select函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。
调用后select函数会阻塞,直到有描述符就绪(有数据可读、可写、或者有except),或者超时(timeout指定等待时间,若是当即返回设为null便可),函数返回。当select函数返回后,能够经过遍历fdset,来找到就绪的描述符。
poll
int poll(struct pollfd *fds,unsigned int nfds,int timeout);
不一样于select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现。
struct pollfd { int fd; /*file descriptor */ short events; /* requested events to watch */ short revents; /* returned events witnessed */ };
pollfd结构包含了要监视的event和发生的event,再也不使用select“参数-值”传递的方式。同时,pollfd并无最大数量限制(可是数量过大后性能也是会降低)。 和select函数同样,poll返回后,须要轮询pollfd来获取就绪的描述符。
从上面看,select和poll都须要在返回后,经过遍历文件描述符来获取已经就绪的socket。
事实上,同时链接的大量客户端在一时刻可能只有不多的处于就绪状态,所以随着监视的描述符数量的增加,其效率也会线性降低。
epool
epoll是在2.6内核中提出的,是以前的select和poll的加强版本。相对于select和poll来讲,epoll更加灵活,没有描述符限制。
epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
epoll操做过程须要三个接口,分别以下:
int epoll_create(int size);//建立一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
(1)int epool_create(int size);
建立一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不一样于select()中的第一个参数,给出最大监听的fd+1的值,参数size并非限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。
当建立好epoll句柄后,它就会占用一个fd值,在linux下若是查看/proc/进程id/fd/,是可以看到这个fd的,因此在使用完epoll后,必须调用close()关闭,不然可能致使fd被耗尽。
(2)int epool_ctl(int epfd,int op,int fd,struct epoll_event *event);
函数是对指定描述符fd执行op操做。
epfd:是epoll_create()的返回值。
op:表示op操做,用三个宏来表示:
添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。
分别添加、删除和修改对fd的监听事件。
fd:是须要监听的fd(文件描述符)
epoll_event:是告诉内核须要监听什么事
(3)int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents个事件。
参数events用来从内核获得事件的集合,maxevents告以内核这个events有多大,这个maxevents的值不能大于建立epoll_create()时的size,参数timeout是超时时间(毫秒,0会当即返回,-1将不肯定,也有说法说是永久阻塞)。该函数返回须要处理的事件数目,如返回0表示已超时。
一个简单的select多并发socket服务端代码:
#!/usr/bin/python #Author:sean import select import socket import queue server = socket.socket() HOST = 'localhost' PORT = 8080 print("start up %s on port: %s",% (HOST,PORT)) server.bind((HOST,PORT)) server.listen() server.setblocking(False) #不阻塞 msg_dic_queue = {} #这是一个队列字典,存放要返回给客户端的数据 inputs = [server] #inputs里存放要让内核监测的链接,这里的server是指监测server自己的链接状态 #inputs = [server,conn] outputs = [] #outputs里存放要返回给客户端的数据链接对象 while True: print("waiting for next connect...") readable,writeable,exceptional = select.select(inputs,outputs,inputs) #若是没有任何fd就绪,程序就会一直阻塞在这里 # print(readable,writeable,exceptional) for r in readable: #处理活跃的链接,每一个r就是一个socket链接对象 if r is server: #表明来了一个新链接 conn,client_addr = server.accept() print("arrived a new connect: ",client_addr) conn.setblocking(False) inputs.append(conn) #由于这个新创建的链接还没发数据来,如今就接收的话,程序就报异常了 #因此要想实现这个客户端发数据来时server端能知道,就须要让select再监测这个conn msg_dic_queue[conn] = queue.Queue() #初始化一个队列,后面存要返回给客户端的数据 else: #r不是server的话就表明是一个与客户端创建的文件描述符了 #客户端的数据过来了,在这里接收 data = r.recv(1024) if data: print("received data from [%s]: "% r.getpeername()[0],data) msg_dic_queue[r].put(data) #收到的数据先放到队列字典里,以后再返回给客户端 if r not in outputs: outputs.append(r) #放入返回的链接队列里。为了避免影响处理与其它客户端的链接,这里不马上返回数据给客户端 else: #若是收不到data就表明客户端已经断开了 print("Client is disconnect",r) if r in outputs: outputs.remove(r) #清理已断开的链接 inputs.remove(r) del msg_dic_queue[r] for w in writeable: #处理要返回给客户端的链接列表 try: next_msg = msg_dic_queue[w].get_nowait() except queue.Empty: print("client [%s]"% w.getpeername()[0],"queue is empty...") outputs.remove(w) #确保下次循环时writeable不返回已经处理完的链接 else: print("sending message to [%s]"% w.getpeername()[0],next_msg) w.send(next_msg) #返回给客户端源数据 for e in exceptional: #处理异常链接 if e in outputs: outputs.remove(e) inputs.remove(e) del msg_dic_queue[e]
select多并发socket客户端代码:
#!/usr/bin/python #Author:sean import socket msgs = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] SERVER_ADDRESS = 'localhost' SERVER_PORT = 8080 # Create a few TCP/IP socket socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(500) ] # Connect the socket to the port where the server is listening print('connecting to %s port %s' % (SERVER_ADDRESS,SERVER_PORT)) for s in socks: s.connect((SERVER_ADDRESS,SERVER_PORT)) for message in msgs: # Send messages on both sockets for s in socks: print('%s: sending "%s"' % (s.getsockname(), message) ) s.send(message) # Read responses on both sockets for s in socks: data = s.recv(1024) print( '%s: received "%s"' % (s.getsockname(), data) ) if not data: print(sys.stderr, 'closing socket', s.getsockname() )
epoll多并发socket服务端代码以下:
#!/usr/bin/python #Author:sean import socket, logging import select, errno logger = logging.getLogger("network-server") def InitLog(): logger.setLevel(logging.DEBUG) fh = logging.FileHandler("network-server.log") fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.ERROR) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) fh.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) if __name__ == "__main__": InitLog() try: # 建立 TCP socket 做为监听 socket listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) except socket.error as msg: logger.error("create socket failed") try: # 设置 SO_REUSEADDR 选项 listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except socket.error as msg: logger.error("setsocketopt SO_REUSEADDR failed") try: # 进行 bind -- 此处未指定 ip 地址,即 bind 了所有网卡 ip 上 listen_fd.bind(('', 8008)) except socket.error as msg: logger.error("bind failed") try: # 设置 listen 的 backlog 数 listen_fd.listen(10) except socket.error as msg: logger.error(msg) try: # 建立 epoll 句柄 epoll_fd = select.epoll() # 向 epoll 句柄中注册 监听 socket 的 可读 事件 epoll_fd.register(listen_fd.fileno(), select.EPOLLIN) except select.error as msg: logger.error(msg) connections = {} addresses = {} datalist = {} while True: # epoll 进行 fd 扫描的地方 -- 未指定超时时间则为阻塞等待 epoll_list = epoll_fd.poll() for fd, events in epoll_list: # 若为监听 fd 被激活 if fd == listen_fd.fileno(): # 进行 accept -- 得到链接上来 client 的 ip 和 port,以及 socket 句柄 conn, addr = listen_fd.accept() logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno())) # 将链接 socket 设置为 非阻塞 conn.setblocking(0) # 向 epoll 句柄中注册 链接 socket 的 可读 事件 epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET) # 将 conn 和 addr 信息分别保存起来 connections[conn.fileno()] = conn addresses[conn.fileno()] = addr elif select.EPOLLIN & events: # 有 可读 事件激活 datas = '' while True: try: # 从激活 fd 上 recv 10 字节数据 data = connections[fd].recv(10) # 若当前没有接收到数据,而且以前的累计数据也没有 if not data and not datas: # 从 epoll 句柄中移除该 链接 fd epoll_fd.unregister(fd) # server 侧主动关闭该 链接 fd connections[fd].close() logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) break else: # 将接收到的数据拼接保存在 datas 中 datas += data except socket.error as msg: # 在 非阻塞 socket 上进行 recv 须要处理 读穿 的状况 # 这里其实是利用 读穿 出 异常 的方式跳到这里进行后续处理 if msg.errno == errno.EAGAIN: logger.debug("%s receive %s" % (fd, datas)) # 将已接收数据保存起来 datalist[fd] = datas # 更新 epoll 句柄中链接d 注册事件为 可写 epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT) break else: # 出错处理 epoll_fd.unregister(fd) connections[fd].close() logger.error(msg) break elif select.EPOLLHUP & events: # 有 HUP 事件激活 epoll_fd.unregister(fd) connections[fd].close() logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) elif select.EPOLLOUT & events: # 有 可写 事件激活 sendLen = 0 # 经过 while 循环确保将 buf 中的数据所有发送出去 while True: # 将以前收到的数据发回 client -- 经过 sendLen 来控制发送位置 sendLen += connections[fd].send(datalist[fd][sendLen:]) # 在所有发送完毕后退出 while 循环 if sendLen == len(datalist[fd]): break # 更新 epoll 句柄中链接 fd 注册事件为 可读 epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET) else: # 其余 epoll 事件不进行处理 continue
五、python之selectors模块
selectors模块是在python3.4版本中引进的,它封装了IO多路复用中的select和epoll,可以更快,更方便的实现多并发效果。
官方文档见:https://docs.python.org/3/library/selectors.html
如下是一个selectors模块的代码示范:
#!/usr/bin/python #Author:sean import selectors import socket #selectors模块默认会用epoll,若是你的系统中没有epoll(好比windows)则会自动使用select sel = selectors.DefaultSelector() #生成一个select对象 def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) conn.setblocking(False) #设定非阻塞 sel.register(conn, selectors.EVENT_READ, read) #新链接注册read回调函数 def read(conn, mask): data = conn.recv(1024) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen() sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) #把刚生成的sock链接对象注册到select链接列表中,并交给accept函数处理 while True: events = sel.select() #默认是阻塞,有活动链接就返回活动的链接列表 #这里看起来是select,其实有可能会使用epoll,若是你的系统支持epoll,那么默认就是epoll for key, mask in events: callback = key.data #去调accept函数 callback(key.fileobj, mask) #key.fileobj就是readable中的一个socket链接对象