python select epoll poll的解析

select、poll、epoll三者的区别 python

select linux

select最先于1983年出如今4.2BSD中,它经过一个select()系统调用来监视多个文件描述符的数组(在linux中一切事物皆文件,块设备,socket链接等。),当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位(变成ready),使得进程能够得到这些文件描述符从而进行后续的读写操做(select会不断监视网络接口的某个目录下有多少文件描述符变成ready状态【在网络接口中,过来一个链接就会创建一个'文件'】,变成ready状态后,select就能够操做这个文件描述符了)。数组

【socketserver是经过多线程来处理多个请求,每一个链接过来分配一个线程来处理,可是select是单进程的,一个进程执行代码确定就是串行的,可是如今就要经过一个进程来实现并发的效果,一个进程下只有一个主线程,也就说说用一个线程实现并发的效果。为何要用一个进程实现多并发而不采用多线程实现多并发呢?缓存

==========答:由于一个进程实现多并发比多线程是实现多并发的效率还要高,由于启动多线程会有不少的开销,并且CPU要不断的检查每一个线程的状态,肯定哪一个线程是否能够执行。这个对系统来讲也是有压力的,用单进程的话就能够避免这种开销和给系统带来的压力,网络

那么单进程是如何实现多并发的呢???多线程

========答:很巧妙的使用了生产者和消费者的模式(异步),生产者和消费者能够实现非阻塞,一个socketserver经过select接收多个链接过来(以前的socket一个进程只能接收一个链接,当接收新的链接的时候产生阻塞,由于这个socket进程要先和客户端进行通讯,两者是彼此互相等待的【客户端发一条消息,服务端收到,客户端等着返回....服务端等着接收.........】一直在阻塞着,这个时候若是再来一个链接,要等以前的那个链接断了,这个才能够连进来。-----------也就是说用基本的socket实现多进程是阻塞的。为了解决这个问题采用每来一个链接产生一个线程,是不阻塞了,可是当线程数量过多的时候,对于cpu来讲开销和压力是比较大的。)对于单个socket来讲,阻塞的时候大部分的时候都是在等待IO操做(网络操做也属于IO操做)。为了不这种状况,就出现了异步=============客户端发起一个链接,会在服务端注册一个文件句柄,服务端会不断轮询这些文件句柄的列表,主进程和客户端创建链接而没有启动线程,这个时候主进程和客户端进行交互,其余的客户端是没法链接主进程的,为了实现主进程既能和已链接的客户端收发消息,又能和新的客户端创建链接,就把轮询变的很是快(死循环)去刷客户端链接进来的文件句柄的列表,只要客户端发消息了,服务端读取了消息以后,有另外一个列表去接收给客户端返回的消息,也不断的去刷这个列表,刷出来后返回给客户端,这样和客户端的此次通讯就完成了,可是跟客户端的链接尚未断,可是就进入了下一次的轮询。】并发

  

select 优势

select目前几乎在全部的平台上支持,良好跨平台性。app

 

select 缺点

  • 每次调用select,都须要把fd集合从用户态拷贝到内核态,这个开销在fd不少的时候会很大
  • 单个进程可以监视的fd数量存在最大限制,在linux上默认为1024(能够经过修改宏定义或者从新编译内核的方式提高这个限制)
  • 而且因为select的fd是放在数组中,而且每次都要线性遍历整个数组,当fd不少的时候,开销也很大

python  select 

调用select的函数为readable,writable,exceptional = select.select(rlist, wlist, xlist[, timeout]),前三个参数都分别是三个列表,数组中的对象均为waitable object:均是整数的文件描述符(file descriptor)或者一个拥有返回文件描述符方法fileno()的对象;

异步

  • rlist: 等待读就绪的list
  • wlist: 等待写就绪的list
  • errlist: 等待“异常”的list

 

select方法用来监视文件描述符,若是文件描述符发生变化,则获取该描述符。
一、这三个list能够是一个空的list,可是接收3个空的list是依赖于系统的(在Linux上是能够接受的,可是在window上是不能够的)。
二、当 rlist  序列中的描述符发生可读时(accetp和read),则获取发生变化的描述符并添加到 readable  序列中
三、当 wlist  序列中含有描述符时,则将该序列中全部的描述符添加到 writable  序列中
四、当 errlist 序列中的句柄发生错误时,则将该发生错误的句柄添加到 exceptional  序列中
五、当 超时时间 未设置,则select会一直阻塞,直到监听的描述符发生变化
    当 超时时间 =  时,那么若是监听的句柄均无任何变化,则select会阻塞  1  秒,以后返回三个空列表,若是监听的描述符(fd)有变化,则直接执行。
六、在list中能够接受Ptython的的 file对象(好比 sys.stdin,或者会被 open()os.open()返回的object),socket object将会返回 socket.socket()。也能够自定义类,只要有一个合适的 fileno()的方法(须要真实返回一个文件描述符,而不是一个随机的整数)。
 

select 示例:socket

Python的select()方法直接调用操做系统的IO接口,它监控sockets,open files, and pipes(全部带fileno()方法的文件句柄)什么时候变成readable 和writeable, 或者通讯错误,select()使得同时监控多个链接变的简单,而且这比写一个长循环来等待和监控多客户端链接要高效,由于select直接经过操做系统提供的C的网络接口进行操做,而不是经过Python的解释器

 1 #coding:UTF8
 2 
 3 import select
 4 import socket
 5 import sys
 6 import Queue
 7 
 8 #建立一个TCP/IP 进程
 9 server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
10 server.setblocking(0)
11 
12 #链接地址和端口
13 server_address = ('localhost',10000)
14 print >>sys.stderr,'starting up on %s prot %s' % server_address
15 server.bind(server_address)
16 
17 #最大容许连接数
18 server.listen(5)
19 
20 inputs = [ server ]
21 outputs = []
22 
23 message_queues = {}
24 
25 while inputs:
26     print >>sys.stderr,'\nwaiting for the next event'
27     readable,writable,exceptional = select.select(inputs,outputs,inputs)
28 
29     # Handle inputs
30     for s in readable:
31      
32         if s is server:
33             # A "readable" server socket is ready to accept a connection
34             connection, client_address = s.accept()
35             print >>sys.stderr, 'new connection from', client_address
36             #connection.setblocking(0)
37             inputs.append(connection)
38      
39             # Give the connection a queue for data we want to send
40             message_queues[connection] = Queue.Queue()
41         
42         else:
43             data = s.recv(1024)
44             if data:
45                 # A readable client socket has data
46                 print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
47                 message_queues[s].put(data)  #这个s至关于connection
48                 # Add output channel for response
49                 if s not in outputs:
50                     outputs.append(s)
51 
52             else:
53                 # Interpret empty result as closed connection
54                 print >>sys.stderr, 'closing', client_address, 'after reading no data'
55                 # Stop listening for input on the connection
56                 if s in outputs:
57                     outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,因此这时候若是这个客户端的链接对象还在outputs列表中,就把它删掉
58                 inputs.remove(s)    #inputs中也删除掉
59                 s.close()           #把这个链接关闭掉
60                  
61                 # Remove message queue
62                 del message_queues[s]
63     
64     # Handle outputs
65     for s in writable:
66         try:
67             next_msg = message_queues[s].get_nowait()
68         except Queue.Empty:
69             # No messages waiting so stop checking for writability.
70             print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'
71             outputs.remove(s)
72         else:
73             print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
74             s.send(next_msg.upper())
75     # Handle "exceptional conditions"
76     for s in exceptional:
77         print >>sys.stderr, 'handling exceptional condition for', s.getpeername()
78         # Stop listening for input on the connection
79         inputs.remove(s)
80         if s in outputs:
81             outputs.remove(s)
82         s.close()
83      
84         # Remove message queue
85         del message_queues[s]     
server

 

代码解析:

 

select()方法接收并监控3个通讯列表, 第一个是全部的输入的data,就是指外部发过来的数据,第2个是监控和接收全部要发出去的data(outgoing data),第3个监控错误信息,接下来咱们须要建立2个列表来包含输入和输出信息来传给select().

# Sockets from which we expect to read
inputs = [ server ] # Sockets to which we expect to write outputs = [ ] 

 

全部客户端的进来的链接和数据将会被server的主循环程序放在上面的list中处理,咱们如今的server端须要等待链接可写(writable)以后才能过来,而后接收数据并返回(所以不是在接收到数据以后就马上返回),由于每一个链接要把输入或输出的数据先缓存到queue里,而后再由select取出来再发出去。

# Outgoing message queues (socket:Queue)
message_queues = {}

The main portion of the server program loops, calling select() to block and wait for network activity.

下面是此程序的主循环,调用select()时会阻塞和等待直到新的链接和数据进来

while inputs: # Wait for at least one of the sockets to be ready for processing print >>sys.stderr, '\nwaiting for the next event' readable, writable, exceptional = select.select(inputs, outputs, inputs) 

 当你把inputs,outputs,exceptional(这里跟inputs共用)传给select()后,它返回3个新的list,咱们上面将他们分别赋值为readable,writable,exceptional, 全部在readable list中的socket链接表明有数据可接收(recv),全部在writable list中的存放着你能够对其进行发送(send)操做的socket链接,当链接通讯出现error时会把error写到exceptional列表中。

 

Readable list 中的socket 能够有3种可能状态,第一种是若是这个socket是main "server" socket,它负责监听客户端的链接,若是这个main server socket出如今readable里,那表明这是server端已经ready来接收一个新的链接进来了,为了让这个main server能同时处理多个链接,在下面的代码里,咱们把这个main server的socket设置为非阻塞模式。

 

第二种状况是这个socket是已经创建了的链接,它把数据发了过来,这个时候你就能够经过recv()来接收它发过来的数据,而后把接收到的数据放到queue里,这样你就能够把接收到的数据再传回给客户端了。

 

第三种状况就是这个客户端已经断开了,因此你再经过recv()接收到的数据就为空了,因此这个时候你就能够把这个跟客户端的链接关闭了。

 

对于writable list中的socket,也有几种状态,若是这个客户端链接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端,不然就把这个链接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个链接,那就会认为这个链接还处于非活动状态

 

最后,若是在跟某个socket链接通讯过程当中出了错误,就把这个链接对象在inputs\outputs\message_queue中都删除,再把链接关闭掉

 

 1 #coding:UTF8
 2 
 3 import socket
 4 import sys
 5  
 6 messages = [ 'This is the message. ',
 7              'It will be sent ',
 8              'in parts.',
 9              ]
10 server_address = ('localhost', 10003)
11  
12 # Create a TCP/IP socket
13 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
14           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
15           ]
16  
17 # Connect the socket to the port where the server is listening
18 print >>sys.stderr, 'connecting to %s port %s' % server_address
19 for s in socks:
20     s.connect(server_address)
21 
22 for message in messages:
23  
24     # Send messages on both sockets
25     for s in socks:
26         print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
27         s.send(message)
28  
29     # Read responses on both sockets
30     for s in socks:
31         data = s.recv(1024)
32         print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
33         if not data:
34             print >>sys.stderr, 'closing socket', s.getsockname()
client

客户端程序展现了如何经过select()对socket进行管理并与多个链接同时进行交互,经过循环经过每一个socket链接给server发送和接收数据。

server:
starting up on localhost prot 10000

waiting for the next event
new connection from ('127.0.0.1', 54812)

waiting for the next event
new connection from ('127.0.0.1', 54813)
received "This is the message. " from ('127.0.0.1', 54812)

waiting for the next event
received "This is the message. " from ('127.0.0.1', 54813)
sending "This is the message. " to ('127.0.0.1', 54812)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
sending "This is the message. " to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
received "It will be sent " from ('127.0.0.1', 54812)
received "It will be sent " from ('127.0.0.1', 54813)

waiting for the next event
sending "It will be sent " to ('127.0.0.1', 54812)
sending "It will be sent " to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
received "in parts." from ('127.0.0.1', 54812)
received "in parts." from ('127.0.0.1', 54813)

waiting for the next event
sending "in parts." to ('127.0.0.1', 54812)
sending "in parts." to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
closing ('127.0.0.1', 54813) after reading no data
closing ('127.0.0.1', 54813) after reading no data

waiting for the next event





client:
connecting to localhost port 10000
('127.0.0.1', 54812): sending "This is the message. "
('127.0.0.1', 54813): sending "This is the message. "
('127.0.0.1', 54812): received "THIS IS THE MESSAGE. "
('127.0.0.1', 54813): received "THIS IS THE MESSAGE. "
('127.0.0.1', 54812): sending "It will be sent "
('127.0.0.1', 54813): sending "It will be sent "
('127.0.0.1', 54812): received "IT WILL BE SENT "
('127.0.0.1', 54813): received "IT WILL BE SENT "
('127.0.0.1', 54812): sending "in parts."
('127.0.0.1', 54813): sending "in parts."
('127.0.0.1', 54812): received "IN PARTS."
('127.0.0.1', 54813): received "IN PARTS."
运行结果

 

 

原文:http://pymotw.com/2/select/ 

 

 

poll 
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差异,可是poll没有最大文件描述符数量的限制。

poll和select一样存在一个缺点就是,包含大量文件描述符的数组被总体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增长而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,若是进程没有对其进行IO操做,那么下次调用select()和poll() 的时候将再次报告这些文件描述符,因此它们通常不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

 

在Python中调用poll

  • select.poll(),返回一个poll的对象,支持注册和注销文件描述符。

  • poll.register(fd[, eventmask])注册一个文件描述符,注册后,能够经过poll()方法来检查是否有对应的I/O事件发生。fd能够是i 个整数,或者有返回整数的fileno()方法对象。若是File对象实现了fileno(),也能够看成参数使用。

  • eventmask是一个你想去检查的事件类型,它能够是常量POLLINPOLLPRI和 POLLOUT的组合。若是缺省,默认会去检查全部的3种事件类型。

事件常量 意义
POLLIN 有数据读取
POLLPRT 有数据紧急读取
POLLOUT 准备输出:输出不会阻塞
POLLERR 某些错误状况出现
POLLHUP 挂起
POLLNVAL 无效请求:描述没法打开
  • poll.modify(fd, eventmask) 修改一个已经存在的fd,和poll.register(fd, eventmask)有相同的做用。若是去尝试修改一个未经注册的fd,会引发一个errnoENOENTIOError
  • poll.unregister(fd)从poll对象中注销一个fd。尝试去注销一个未经注册的fd,会引发KeyError
  • poll.poll([timeout])去检测已经注册了的文件描述符。会返回一个可能为空的list,list中包含着(fd, event)这样的二元组。 fd是文件描述符, event是文件描述符对应的事件。若是返回的是一个空的list,则说明超时了且没有文件描述符有事件发生。timeout的单位是milliseconds,若是设置了timeout,系统将会等待对应的时间。若是timeout缺省或者是None,这个方法将会阻塞直到对应的poll对象有一个事件发生。
#coding: utf-8 

import select, socket

response = b"hello world"

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('localhost', 10000))
serversocket.listen(1)
serversocket.setblocking(0)

#
poll = select.poll()
poll.register(serversocket.fileno(), select.POLLIN)

connections = {}
while True:
    for fd, event in poll.poll():
        if event == select.POLLIN:
            if fd == serversocket.fileno():
                con, addr = serversocket.accept()
                poll.register(con.fileno(), select.POLLIN)
                connections[con.fileno()] = con
            else:
                con = connections[fd]
                data = con.recv(1024)
                if data:
                    poll.modify(con.fileno(), select.POLLOUT)
        elif event == select.POLLOUT:
            con = connections[fd]
            con.send(response)
            poll.unregister(con.fileno())
            con.close()
View Code

 

epoll 
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具有了以前所说的一切优势,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll能够同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,若是咱们没有采起行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,可是代码实现至关复杂。

epoll一样只告知那些就绪的文件描述符,并且当咱们调用epoll_wait()得到就绪文件描述符时,返回的不是实际的描述符,而是一个表明 就绪描述符数量的值,你只须要去epoll指定的一个数组中依次取得相应数量的文件描述符便可,这里也使用了内存映射(mmap)技术,这样便完全省掉了 这些文件描述符在系统调用时复制的开销。

另外一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用必定的方法后,内核才对全部监视的文件描 述符进行扫描,而epoll事先经过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用相似callback的回调 机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便获得通知。

 

在Python中调用epoll

  • select.epoll([sizehint=-1])返回一个epoll对象。

  • eventmask

事件常量 意义
EPOLLIN 读就绪
EPOLLOUT 写就绪
EPOLLPRI 有数据紧急读取
EPOLLERR assoc. fd有错误状况发生
EPOLLHUP assoc. fd发生挂起
EPOLLRT 设置边缘触发(ET)(默认的是水平触发)
EPOLLONESHOT 设置为 one-short 行为,一个事件(event)被拉出后,对应的fd在内部被禁用
EPOLLRDNORM 和 EPOLLIN 相等
EPOLLRDBAND 优先读取的数据带(data band)
EPOLLWRNORM 和 EPOLLOUT 相等
EPOLLWRBAND 优先写的数据带(data band)
EPOLLMSG 忽视
  • epoll.close()关闭epoll对象的文件描述符。
  • epoll.fileno返回control fd的文件描述符number。
  • epoll.fromfd(fd)用给予的fd来建立一个epoll对象。
  • epoll.register(fd[, eventmask])在epoll对象中注册一个文件描述符。(若是文件描述符已经存在,将会引发一个IOError
  • epoll.modify(fd, eventmask)修改一个已经注册的文件描述符。
  • epoll.unregister(fd)注销一个文件描述符。
  • epoll.poll(timeout=-1[, maxevnets=-1])等待事件,timeout(float)的单位是秒(second)。
 1 #coding:Utf8
 2 import socket, select
 3 
 4 EOL1 = b'\n\n'
 5 EOL2 = b'\n\r\n'
 6 response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
 7 response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
 8 response += b'Hello, world!'
 9 
10 serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
11 serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
12 serversocket.bind(('localhost', 10000))
13 serversocket.listen(1)
14 serversocket.setblocking(0)
15 
16 epoll = select.epoll()
17 epoll.register(serversocket.fileno(), select.EPOLLIN)
18 
19 try:
20    connections = {}; requests = {}; responses = {}
21    while True:
22       events = epoll.poll(1)
23       for fileno, event in events:
24          if fileno == serversocket.fileno():
25             connection, address = serversocket.accept()
26             connection.setblocking(0)
27             epoll.register(connection.fileno(), select.EPOLLIN)
28             connections[connection.fileno()] = connection
29             requests[connection.fileno()] = b''
30             responses[connection.fileno()] = response
31          elif event & select.EPOLLIN:
32             requests[fileno] += connections[fileno].recv(1024)
33             if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
34                epoll.modify(fileno, select.EPOLLOUT)
35                print('-'*40 + '\n' + requests[fileno].decode()[:-2])
36          elif event & select.EPOLLOUT:
37             byteswritten = connections[fileno].send(responses[fileno])
38             responses[fileno] = responses[fileno][byteswritten:]
39             if len(responses[fileno]) == 0:
40                epoll.modify(fileno, 0)
41                connections[fileno].shutdown(socket.SHUT_RDWR)
42          elif event & select.EPOLLHUP:
43             epoll.unregister(fileno)
44             connections[fileno].close()
45             del connections[fileno]
46 finally:
47    epoll.unregister(serversocket.fileno())
48    epoll.close()
49    serversocket.close()
View Code
相关文章
相关标签/搜索