Select\Poll\Epoll异步IO与事件驱动

事件驱动与异步IO

事件驱动编程是一种编程规范,这里程序的执行流由外部事件来规定。它的特色是包含一个事件循环,但外部事件发生时使用回调机制来触发响应的处理。另外两种常见的编程规范是(单线程)同步以及多线程编程。python

 

 

在单线程同步模型中,任务按照顺序执行。若是某个任务由于IO而阻塞,其余全部的任务都必须等待,直到它完成以后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断出来的。若是任务之间并无相互依赖的关系,但仍然须要相互等待的话这就使得程序没必要要的下降了运行速度。程序员

 

在多线程版本中,2个任务分别在独立的线程中执行。这些线程由操做系统来管理,在多处理器系统上能够并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其余线程得以继续执行。与完成相似功能的同步程序相比,这种方式更有效率,但程序必须写代码来保护共享资源,防止其余被多线程同时访问。多线程程序更加难以推断,由于这类程序不得不经过线程同步机制加锁,可重入函数,线程局部存储或其余机制来处理线程安全问题。编程

 

在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理IO或者其余昂贵的操做时,注册一个回调到事件循环中,而后当IO操做完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询全部的事件,当事件来到时将它们分配给等待处理事件的回调函数。这种方式让程序尽量的得以执行而不须要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,由于程序员不须要关系线程安全问题。数组

 

事件驱动模型适用的环境:缓存

  • 程序中有许多任务,并且任务之间不存在因果联系
  • 任务之间高度独立(由于它们不须要相互通讯,或着等待彼此)
  • 在等待事件到来时,某些任务会阻塞

 

 

Select \ Poll \ Epoll 异步IO

三者的区别:

select

select最先于1983年出如今4.2BSD中,它经过一个select() 系统调用来监控多个文件描述符的数组,当select() 返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程能够得到这些文件描述符从而进行后续的读写操做。安全

select目前几乎在全部的平台上支持,其良好跨平台支持也是它的一个优势,事实上从如今来看,这也是它所剩很少的优势之一。服务器

select的缺点在于单个进程可以监视的文件描述符的数量存在有最大限制,在Linux上通常为1024,不过能够经过修改宏定义甚至从新编译内核的方式提高这一限制。网络

另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增加。同时,因为网络响应时间的延迟使得大量TCP链接处于非活跃状态,但调用select()会对全部socket进行一次线性扫描,因此这也浪费了必定开销。数据结构

 

poll

poll在1986年诞生于System V Release 3,它和select在本质上没有太大差异,可是poll没有最大文件描述符数量的限制。多线程

poll和select一样存在一个缺点就是,包含大量文件描述符的数组被总体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增长而线性增大。

另外,select()和poll() 将就绪的文件描述符告诉进程后,若是进程没有对其进行IO操做,那么下次调用select()和poll()的时候将再次报告这些文件描述符,因此它们通常不会丢失就绪的消息,这种方式称为水平触发。

 

epoll

直到Linux2.6才出现了由内核直接支持的实现方式,就是epoll,它几乎具有了以前所说的一切优势,被公认为Linux 2.6下性能最好的多路IO就绪通知方式。

epoll能够同时支持水平触发和边缘触发(只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,若是没有采起行动,那么它就不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要高一些,可是代码实现至关复杂。

epoll一样只告知那些就绪的文件描述符,并且当咱们调用epoll_wait()得到就绪文件描述符时,返回的不是实际的描述符,而是一个表明就绪描述符数量的值,你只须要去epoll指定的一个数组中依次取得相应数量的文件描述符便可,这里也使用了内存映射(mmap)技术,这样便完全省掉了这些文件描述符在系统调用时复制的开销。

另外一个本质的改进在于epoll采用基于事件的就绪通知方式,在select、poll中,进程只有在调用必定的方法后,内核才对全部监视的文件描述符进行扫描,而epoll事先经过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类是callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便获得通知。

 

 

 

python select

python的select() 方法直接调用操做系统的IO接口,它监控sockets,open files,and pipes(全部带fileno()方法的文件句柄)什么时候变成 readable 和 writeable 或者通讯错误,select()使得同时监控多个链接变的简单,而且这比写一个长循环来等待和监控多客户端链接要高效,由于select直接经过操做系统提供的C的网络接口进行操做,而不是经过python的解释器。

下面经过echo server例子了解select是如何经过单进程实现同时处理多个非阻塞的scoket链接的

import select
import socket
import sys
import Queue
 
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
 
# Bind the socket to the port
server_address = ('localhost', 10000)
print >>sys.stderr, 'starting up on %s port %s' % server_address
server.bind(server_address)
 
# Listen for incoming connections
server.listen(5)

slect()方法接收并监控3个通讯列表,第一个是全部的输入的data,就是指外部发过来的数据,第2个是监控和接收全部要法术的data(outgoing data),第3个监控错误信息,接下来须要建立2个列表来包含输入和输出信息来传递给select()

# Sockets from which we expect to read
inputs = [ server ]

# Sockets to which we expect to write
outputs = [ ]

全部客户端的进来的链接和数据将会被server的主循环程序放在上面的list中处理,咱们如今的server端须要等待链接可写(writable)以后才能过来,而后接收数据并返回(所以不是在接收到数据以后马上返回),由于每一个链接要把输入或输出的数据先缓存到queue里,而后再由select取出来再发出去。

# Outgoing message queues (socket:Queue)
message_queues = {}

下面是此程序的主循环,调用select()时会阻塞和等待直到新的链接和数据进来

while inputs:

    # Wait for at least one of the sockets to be ready for processing
    print >>sys.stderr, '\nwaiting for the next event'
    readable, writable, exceptional = select.select(inputs, outputs, inputs)

当你把 inputs,outputs,exceptional(这里跟inputs共用)传给select()后,它返回3个新的list,咱们上面将它们分别赋值为 readable , writable ,execptional,全部在readable list中的socket链接表明有数据可接收(recv),全部在 writable list中的存放着能够对其进行发送(send)操做的socket 链接,当链接通讯出现error时会把error写到 exceptional 列表中。

 

readable list中的socket能够有3种可能状态

第一种是若是这个socket是 main “server” socket,它负责监听客户端的链接,若是 main server socket 出如今readable里,表明这个server端已经ready来接收一个新的链接进来了,为了让这个 main server 能同时处理多个链接,在下面的代码里,咱们把这个main server 的socket设置为非阻塞模式。

# Handle inputs
for s in readable:
 
    if s is server:
        # A "readable" server socket is ready to accept a connection
        connection, client_address = s.accept()
        print >>sys.stderr, 'new connection from', client_address
        connection.setblocking(0)
        inputs.append(connection)
 
        # Give the connection a queue for data we want to send
        message_queues[connection] = Queue.Queue()

第二中状况是这个socket是已经创建的链接,它把数据发了过来,这个时候你就能够经过recv() 来接收它发过来的数据,而后把接收到的数据放到queue里,这样就能够把接收到的数据再传回给客户端了。

else:
     data = s.recv(1024)
     if data:
         # A readable client socket has data
         print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
         message_queues[s].put(data)
         # Add output channel for response
         if s not in outputs:
             outputs.append(s)

第三种状况就是这个客户端已经断开了,因此再经过recv() 接收到的数据就是为空了,因此这个时候就能够把这个跟客户端的链接关闭了。

 

else:
    # Interpret empty result as closed connection
    print >>sys.stderr, 'closing', client_address, 'after reading no data'
    # Stop listening for input on the connection
    if s in outputs:
        outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,因此这时候若是这个客户端的链接对象还在outputs列表中,就把它删掉
    inputs.remove(s)    #inputs中也删除掉
    s.close()           #把这个链接关闭掉
 
    # Remove message queue
    del message_queues[s]

对于writable list中的socket,也有几种状态,若是这个客户端链接在跟它对应的queue里面的数据,就把这个数据取出来再发回给这个客户端,是否就把这个链接从 output list 中移除,这样下一次循环select()调用时检测到 outputs list中没有这个链接,那就会认为这个链接还处于非活动状态。

# Handle outputs
for s in writable:
    try:
        next_msg = message_queues[s].get_nowait()
    except Queue.Empty:
        # No messages waiting so stop checking for writability.
        print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'
        outputs.remove(s)
    else:
        print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
        s.send(next_msg)

组后,若是在跟某个socket链接通讯过程当中出现了错误,就把这个链接对象在 inputs、outputs、message_queue中都删除,再把链接关闭掉

# Handle "exceptional conditions"
for s in exceptional:
    print >>sys.stderr, 'handling exceptional condition for', s.getpeername()
    # Stop listening for input on the connection
    inputs.remove(s)
    if s in outputs:
        outputs.remove(s)
    s.close()
 
    # Remove message queue
    del message_queues[s]

 

 

最后服务器端的完整代码:

import select
import socket
import sys
import queue
 
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
 
# Bind the socket to the port
server_address = ('localhost', 10000)
print(sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address)
 
# Listen for incoming connections
server.listen(5)
 
# Sockets from which we expect to read
inputs = [ server ]
 
# Sockets to which we expect to write
outputs = [ ]
 
message_queues = {}
while inputs:
 
    # Wait for at least one of the sockets to be ready for processing
    print( '\nwaiting for the next event')
    readable, writable, exceptional = select.select(inputs, outputs, inputs)
    # Handle inputs
    for s in readable:
 
        if s is server:
            # A "readable" server socket is ready to accept a connection
            connection, client_address = s.accept()
            print('new connection from', client_address)
            connection.setblocking(False)
            inputs.append(connection)
 
            # Give the connection a queue for data we want to send
            message_queues[connection] = queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                # A readable client socket has data
                print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
                message_queues[s].put(data)
                # Add output channel for response
                if s not in outputs:
                    outputs.append(s)
            else:
                # Interpret empty result as closed connection
                print('closing', client_address, 'after reading no data')
                # Stop listening for input on the connection
                if s in outputs:
                    outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,因此这时候若是这个客户端的链接对象还在outputs列表中,就把它删掉
                inputs.remove(s)    #inputs中也删除掉
                s.close()           #把这个链接关闭掉
 
                # Remove message queue
                del message_queues[s]
    # Handle outputs
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except queue.Empty:
            # No messages waiting so stop checking for writability.
            print('output queue for', s.getpeername(), 'is empty')
            outputs.remove(s)
        else:
            print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
            s.send(next_msg)
    # Handle "exceptional conditions"
    for s in exceptional:
        print('handling exceptional condition for', s.getpeername() )
        # Stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
 
        # Remove message queue
        del message_queues[s]

 

客户端完整代码:

import socket
import sys
 
messages = [ 'This is the message. ',
             'It will be sent ',
             'in parts.',
             ]
server_address = ('localhost', 10000)
 
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]
 
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
    s.connect(server_address)
 
for message in messages:
 
    # Send messages on both sockets
    for s in socks:
        print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
        s.send(message)
 
    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
        if not data:
            print >>sys.stderr, 'closing socket', s.getsockname()
            s.close()
相关文章
相关标签/搜索