并发编程-epoll模型的探索与实践

前言

咱们知道nginx的效率很是高,能处理上万级的并发,其之因此高效离不开epoll的支持,python

epoll是什么呢?,epoll是IO模型中的一种,属于多路复用IO模型;linux

到这里你应该想到了,select,的确select也是一种多路复用的IO模型,可是其单个select最多只能同时处理1024个socket,效率实在算不上高,这时候epoll来救场了nginx

本文从阻塞IO模型的基础上展开讨论,一步步靠近epoll的实现原理,最后以一个简单的epoll案例程序做为结束面试

亲手写一个epoll,而后去虐面试官吧!服务器

在select的学习过程当中咱们知道了select 只能同时处理1024个客户端,多线程

而多线程会遇到资源瓶颈,什么才是解决高并发最有效的方式呢并发

linux中提供了epoll 这种高效的多路复用IO模型app

注意其余平台没有相应的实现因此epoll仅在linux中可用socket

程序阻塞过程分析

epoll代码实现并不复杂,可是要搞清楚其高效的原理仍是须要花一些时间的函数

咱们从最原始的阻塞模型开始分析

假设系统目前运行了三个进程 A B C

进程A正在运行一下socket程序

server = socket.socket()
server.bind(("127.0.0.1",1688))
server.listen()
server.accept()

1.系统会建立文件描述符指向一个socket对象 ,其包含了读写缓冲区,已经进行等待队列

2.当执行到accept / recv 时系统会讲进程A 从工做队列中移除

3.将进程A的引用添加到 socket对象的等待队列中

进程的唤醒

1.当网卡收到数据后会现将数据写入到缓冲区

2.发送中断信号给CPU

3.CPU执行中断程序,将数据从内核copy到socket的缓冲区

4.唤醒进程,即将进程A切换到就绪态,同时从socket的等待队列中移除这个进程引用

select监控多个socket

select的实现思路比较直接

1.先将全部socket放到一个列表中,

2.遍历这个列表将进程A 添加到每一个socket的等待队列中 而后阻塞进程

3.当数据到达时,cpu执行中断程序将数据copy给socket 同时唤醒处于等待队列中的进程A

为了防止重复添加等待队列 还须要移除已经存在的进程A

4.进程A唤醒后 因为不清楚那个socket有数据,因此须要遍历一遍全部socket列表

从上面的过程当中不难看出

1.select,须要遍历socket列表,频繁的对等待队列进行添加移除操做,

2.数据到达后还须要给变量全部socket才能获知哪些socket有数据

两个操做消耗的时间随着要监控的socket的数量增长而大大增长,

处于效率考虑才规定了最大只能监视1024个socket

epol要解决的问题

1.避免频繁的对等待队列进行操做
2.避免遍历全部socket
对于第一个问题咱们先看select的处理方式
while True:
    r_list,w_list,x_list = select.select(rlist,wlist,xlist)

每次处理完一次读写后,都须要将全部过冲重复一遍,包括移除进程,添加进程,默认就会将进程添加到等待队列,并阻塞住进程,然而等待队列的更新操做并不频繁,

因此对于第一个问题epoll采起的方案是,将对等待队列的维护和,阻塞进程这两个操做进行拆分,

相关代码以下

import socket,select
server = socket.socket()
server.bind(("127.0.0.1",1688))
server.listen(5)

#建立epoll事件对象,后续要监控的事件添加到其中
epoll = select.epoll()
#注册服务器监听fd到等待读事件集合
epoll.register(server.fileno(), select.EPOLLIN)

# 等待事件发生
while True:
    for sock,event in epoll.poll():
    pass

在epoll中register 与 unregister函数用于维护等待队列

epoll.poll则用于阻塞进程

这样一来就避免了 每次处理都须要从新操做等待队列的问题

第二个问题是select中进程没法获知哪些socket是有数据的因此须要遍历

epol为了解决这个问题,在内核中维护了一个就绪列表,

1.建立epoll对象,epoll也会对应一个文件,由文件系统管理

2.执行register时,将epoll对象 添加到socket的等待队列中

3.数据到达后,CPU执行中断程序,将数据copy给socket

4.在epoll中,中断程序接下来会执行epoll对象中的回调函数,传入就绪的socket对象

5.将socket,添加到就绪列表中

6.唤醒epoll等待队列中的进程,

进程唤醒后,因为存在就绪列表,因此不须要再遍历socket了,直接处理就绪列表便可

解决了这两个问题后,并发量获得大幅度提高,最大可同时维护上万级别的socket

epoll相关函数

import select 导入select模块

epoll = select.epoll() 建立一个epoll对象

epoll.register(文件句柄,事件类型) 注册要监控的文件句柄和事件

事件类型:

  select.EPOLLIN    可读事件

  select.EPOLLOUT   可写事件

  select.EPOLLERR   错误事件

  select.EPOLLHUP   客户端断开事件

epoll.unregister(文件句柄)   销毁文件句柄

epoll.poll(timeout)  当文件句柄发生变化,则会以列表的形式主动报告给用户进程,timeout

                     为超时时间,默认为-1,即一直等待直到文件句柄发生变化,若是指定为1

                     那么epoll每1秒汇报一次当前文件句柄的变化状况,若是无变化则返回空

epoll.fileno() 返回epoll的控制文件描述符(Return the epoll control file descriptor)

epoll.modfiy(fineno,event) fineno为文件描述符 event为事件类型  做用是修改文件描述符所对应的事件

epoll.fromfd(fileno) 从1个指定的文件描述符建立1个epoll对象

epoll.close()   关闭epoll对象的控制文件描述符

案例:

#coding:utf-8
#客户端
#建立客户端socket对象
import socket
clientsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
#服务端IP地址和端口号元组
server_address = ('127.0.0.1',1688)
#客户端链接指定的IP地址和端口号
clientsocket.connect(server_address)

while True:
    #输入数据
    data = raw_input('please input:')
    if data == "q":
        break
    if not data:
      continue
    #客户端发送数据
    clientsocket.send(data.encode("utf-8"))
    #客户端接收数据
    server_data = clientsocket.recv(1024)
    print ('客户端收到的数据:',server_data)
#关闭客户端socket
clientsocket.close()

服务器:

# coding:utf-8
import socket, select

server = socket.socket()
server.bind(("127.0.0.1", 1688))
server.listen(5)

msgs = []


fd_socket = {server.fileno(): server}
epoll = select.epoll()
# 注册服务器的 写就绪
epoll.register(server.fileno(), select.EPOLLIN)

while True:
    for fd, event in epoll.poll():
        sock = fd_socket[fd]
        print(fd, event)
        # 返回的是文件描述符 须要获取对应socket
        if sock == server:  # 若是是服务器 就接受请求
            client, addr = server.accept()
            # 注册客户端写就绪
            epoll.register(client.fileno(), select.EPOLLIN)
            # 添加对应关系
            fd_socket[client.fileno()] = client

        # 读就绪
        elif event == select.EPOLLIN:
            data = sock.recv(2018)
            if not data:
                # 注销事件
                epoll.unregister(fd)
                # 关闭socket
                sock.close()
                # 删除socket对应关系
                del fd_socket[fd]
                print(" somebody fuck out...")
                continue

            print(data.decode("utf-8"))
            # 读完数据 须要把数据发回去因此接下来更改成写就绪=事件
            epoll.modify(fd, select.EPOLLOUT)
            #记录数据
            msgs.append((sock,data.upper()))
        elif event == select.EPOLLOUT:
            for item in msgs[:]:
                if item[0] == sock:
                    sock.send(item[1])
                    msgs.remove(item)
            # 切换关注事件为写就绪
            epoll.modify(fd,select.EPOLLIN)

注意:上述代码只能在linux下运行,由于epoll模型是linux内核提供的,上层代码没法实现!

相关文章
相关标签/搜索