UDP的epoll并发框架—解决OpenUOM的并发问题

UDP具备是一种很好的封装协议,好比OpenUOM使用UDP封装会比TCP好不少,如今愈来愈多的业务采用UDP传输,而后本身定义按序到达以及流控逻辑,然而就我我的的使用经验来看,UDP太难作并发,大多数状况下,使用UDP会让epoll等高性能event机制优点全无。本文以OpenUOM为例,说明一下我是怎么解决UDP并发问题的。nginx

异步并发模型与epoll

和apache相比,nginx采用异步的处理方式,也就是说,一个线程能够处理多个链接,基于event模型,来了个数据包就读,可能依次到达的数据不属于同一个链接,可是不要紧,只要能将可读的socket描述符和具体的链接对应上便可。这样会使得在大并发场景下,让CPU逼近其极限运转,由于它几乎没有时间闲着,它会一直处理到达的数据包。apache的模型就不是这样,它会让一个链接单独占有一个线程,若是有大量的链接就会有大量的线程,然而对于每个线程而言,其数据读写的压力并非很大,这就会致使大量线程之间频繁切换,而切换会致使cache的刷新等反作用...所以在一样的硬件配置情形下,nginx的异步模型要比apache好不少。golang

咱们已经知道,异步处理是搞定大并发的根本,接下来的问题是,如何让一个就绪的socket和一个业务逻辑链接对应起来,这个问题在同步模型下并不存在,由于一个线程只处理一个链接。曾经的event机制好比select,poll,它们只能告诉你socket n就绪了,你不得不本身去经过数据结构来组织socket n和该链接信息之间的关系,典型的以下:算法

struct conn {
    int sd;
    void *others;
};

list conns;

一个链表conns囊括了该线程负责的全部链接,若是select/poll告诉你socket n就绪了,你不得不遍历这个conns链表,比较谁的sd是n,而后取出conn来处理,虽然能够用更加高效的数据结构,可是查找是必不可少的。然而epoll解决了这个问题。apache

在调用epoll_ctrl将一个socket加入到epoll中时,API会为你提供一个指针,让你直接绑定一个socket描述符和一个指针,一旦socket就绪,取出的是一个结构体,其中包含了与该socket对应的指针,所以你即可以这么作:编程

conn.sd = sd;
conn.others = all;
ev.events = EPOLLIN;
ev.data.ptr = &conn;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, sd, &ev);
while (1) { 
    nfds = epoll_wait(kdpfd, events, 10000, -1);
    for (n = 0; n < nfds; ++n) {
        conn = events[n].data.ptr;
        recv(conn.sd, ....);
        ....
    }
}

conn会一会儿取出来。这是合理的方式。毕竟,内核中已经通过socket查找了,一个5元组惟一表明了一个链接,为什么要在用户态程序再找一次呢?所以除了epoll不须要遍历全部的被监视socket以外,能够保存用户的指针也是其相对于select/poll的一大优点。nginx正是用的这种方式。咱们回到OpenUOM。服务器

使用TCP的OpenUOM

使用TCP的OpenUOM跟nginx几乎是如出一辙,其核心处理逻辑以下:数据结构

/* 加入侦听socket */
context.sd = listener;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);

/* 加入TUN网卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);

while(1) {
    nfds = epoll_wait(kdpfd, events, 10000, -1);
    for (n = 0; n < nfds; ++n) {
        if (events[n].data.ptr == context) {
            child_sd = accept(context.sd, remote_addr....);
            multi_instance *mi = create_mi(child_sd, remote_addr, ...);
            entry.ptr = mi;
            entry.type = SOCKET;
            new_ev.events = EPOLLIN;
            new_ev.data.ptr = entry;
            epoll_ctl(kdpfd, EPOLL_CTL_ADD, child_sd, &new_ev);
            ....
        } else if (events[n].data.ptr.type == SOCKET){
            multi_instance *mi = events[n].data.ptr;
            data = read_from_socket(mi);
            // 这里简化了处理,由于并非每个数据包都是须要加密解密的,还有控制通道的包
            decrypt(mi, data);
            write_to_tun(data);
        } else {
            tun *tun = events[n].data.ptr.ptr;
            packet = read_from_tun(tun);
            lock(mi_hashtable);
            multi_instance *mi = lookup_multi_instance_from(packet);
            unlock(mi_hashtable);
            encrypt(packet);
            write_to_socket(packet, mi);
        }
    }
    ...
}

以上就是TCP模式下的OpenUOM所有逻辑,能够看到,若是socket可读,那么就能够直接取到multi_instance,而后顺序处理就是了。我记得去年我就把OpenUOM改为多线程了,可是如今看来那是个失败的作法。若是使用TCP,从上述逻辑能够看到,就算使用多线程,在socket-to-tun这个路径上也不用加锁,所以multi_instance直接经过epoll_wait就能够取的到。多线程

须要C/C++ Linux服务器架构师学习资料加群812855908(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享架构

UDP的epoll并发框架—解决OpenUOM的并发问题

使用UDP的OpenUOM

然而对于UDP而言,OpenUOM的处理逻辑根上面TCP的逻辑就大相径庭了。由于全程只有一个UDP socket,接受全部客户端的链接,此时根本不存在什么多路复用的问题,充其量也就是那惟一的UDP socket和tun网卡字符设备两者之间的两路复用,使用epoll彻底没有必要。为了定位了具体的multi_instance,你不得不先去read惟一的那个UDP socket,而后根据recvfrom返回参数中的sockaddr结构体来构造4元组,而后根据这4元组在全局的multi_instance hash表中去查找具体multi_instance实例。其逻辑以下所示:并发

/* 加入惟一的UDP socket */
context.sd = udp_sd;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);

/* 加入TUN网卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);

while(1) {
    nfds = epoll_wait(kdpfd, events, 10000, -1);
    for (n = 0; n < nfds; ++n) {  //实际上nfds最多也就是2
        if (events[n].data.ptr == context) {
            data = recvfrom(context.sd, remote_addr....);
            lock(mi_hashtable);   //若是多线程,这个锁将会成为瓶颈,即使是RW锁也同样
            multi_instance *mi = lookup_mi(child_sd, remote_addr, ...);  //再好的hash算法,也不是0成本的!
            unlock(mi_hashtable);
            // 这里简化了处理,由于并非每个数据包都是须要加密解密的,还有控制通道的包
            decrypt(mi, data);
            write_to_tun(data);  
            ....
        } else {
            tun *tun = events[n].data.ptr.ptr;
            packet = read_from_tun(tun);
            lock(mi_hashtable);
            multi_instance *mi = lookup_multi_instance_from(packet);
            unlock(mi_hashtable);
            encrypt(packet);
            write_to_socket(packet, mi);
        }
    }
    ...
}

可见,TCP的OpenUOM和UDP的OpenUOM处理方式彻底不一样,UDP的问题在于,彻底没有充分利用epoll的多路复用机制,不得不根据数据包的recvfrom返回地址来查找multi_instance...

让UDP socket也Listen起来

若是UDP也能像TCP同样,每个用户接进来就为之建立一个单独的socket为其专门服务该多好,这样在大并发的时候,就能够充分复用内核UDP层的socket查找结论加上epoll的通知机制了。理论上这是可行的,由于UDP的4元组能够惟一识别一个与之通讯的客户端,虽然UDP生成无链接,不可靠,可是为每个链接的客户端建立一个socket并无破坏UDP的语义,只是改变了UDP的编程模型而已,内核协议栈依然不会去刻意维护一个UDP链接,也不会进行任何的数据确认。
须要说明的是,这种方案仅仅对“长链接”的UDP有意义,好比OpenUOM这类。由于UDP是没有链接的,那么你也就不知道一个客户端何时会永远中止发送数据,所以必然要经过定时器来定时关闭那些在必定时间段内没有数据的socket。
为了验证可行性,我先在用户态作实验,也就是说,接受一个客户端的“链接请求”(其实就是一个数据包)时,我手工为其建立一个socket,而后bind本地地址,而且connect从recvfrom返回的对端地址,这样理论上对于后续的数据包,epoll都应该触发这个新的socket,毕竟它更精确。事实是否是这样呢?如下的程序能够证实:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <pthread.h>
#include <assert.h>

#define SO_REUSEPORT    15

#define MAXBUF 10240
#define MAXEPOLLSIZE 100

int flag = 0;

int read_data(int sd) {
    char recvbuf[MAXBUF + 1];
    int  ret;
    struct sockaddr_in client_addr;
    socklen_t cli_len=sizeof(client_addr);

    bzero(recvbuf, MAXBUF + 1);
  
    ret = recvfrom(sd, recvbuf, MAXBUF, 0, (struct sockaddr *)&client_addr, &cli_len);
    if (ret > 0) {
        printf("read[%d]: %s  from  %dn", ret, recvbuf, sd);
    } else {
        printf("read err:%s  %dn", strerror(errno), ret);
      
    }
    fflush(stdout);
}

int udp_accept(int sd, struct sockaddr_in my_addr) {
    int new_sd = -1;
    int ret = 0;
    int reuse = 1;
    char buf[16];
    struct sockaddr_in peer_addr;
    socklen_t cli_len = sizeof(peer_addr);

    ret = recvfrom(sd, buf, 16, 0, (struct sockaddr *)&peer_addr, &cli_len);
    if (ret > 0) {
    }

    if ((new_sd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {
        perror("child socket");
        exit(1);
    } else {
        printf("parent:%d  new:%dn", sd, new_sd);
    }

    ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEADDR, &reuse,sizeof(reuse));
    if (ret) {
        exit(1);
    }

    ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse));
    if (ret) {
        exit(1);
    }

    ret = bind(new_sd, (struct sockaddr *) &my_addr, sizeof(struct sockaddr));
    if (ret){
        perror("chid bind");
        exit(1);
    } else {
    }

    peer_addr.sin_family = PF_INET;
    printf("aaa:%sn", inet_ntoa(peer_addr.sin_addr));
    if (connect(new_sd, (struct sockaddr *) &peer_addr, sizeof(struct sockaddr)) == -1) {
        perror("chid connect");
        exit(1);
    } else {
    }

out:
    return new_sd;
}

int main(int argc, char **argv) {
    int listener, kdpfd, nfds, n, curfds;
    socklen_t len;
    struct sockaddr_in my_addr, their_addr;
    unsigned int port;
    struct epoll_event ev;
    struct epoll_event events[MAXEPOLLSIZE];
    int opt = 1;;
    int ret = 0;

    port = 1234;
  
    if ((listener = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {
        perror("socket");
        exit(1);
    } else {
        printf("socket OKn");
    }

    ret = setsockopt(listener,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
    if (ret) {
        exit(1);
    }

    ret = setsockopt(listener, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
    if (ret) {
        exit(1);
    }
  
    bzero(&my_addr, sizeof(my_addr));
    my_addr.sin_family = PF_INET;
    my_addr.sin_port = htons(port);
    my_addr.sin_addr.s_addr = INADDR_ANY;
    if (bind(listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1) {
        perror("bind");
        exit(1);
    } else {
        printf("IP bind OKn");
    }
  
    kdpfd = epoll_create(MAXEPOLLSIZE);

    ev.events = EPOLLIN|EPOLLET;
    ev.data.fd = listener;

    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0) {
        fprintf(stderr, "epoll set insertion error: fd=%dn", listener);
        return -1;
    } else {
        printf("ep add OKn");
    }
 
    while (1) {
      
        nfds = epoll_wait(kdpfd, events, 10000, -1);
        if (nfds == -1) {
            perror("epoll_wait");
            break;
        }
      
        for (n = 0; n < nfds; ++n) {
            if (events[n].data.fd == listener) {
                printf("listener:%dn", n);
                int new_sd;               
                struct epoll_event child_ev;

                new_sd = udp_accept(listener, my_addr);
                child_ev.events = EPOLLIN;
                child_ev.data.fd = new_sd;
                if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_sd, &child_ev) < 0) {
                    fprintf(stderr, "epoll set insertion error: fd=%dn", new_sd);
                    return -1;
                }
            } else {
                read_data(events[n].data.fd);
            }
        }
    }
    close(listener);
    return 0;
}

须要说明的是,REUSEPORT是必要的,由于在connect以前,你必须为新建的socket bind跟listener同样的IP地址和端口,所以就须要这个socket选项。
此时,若是你用多个udp客户端去给这个服务端发数据,会发现彻底实现了想要的效果。

内核中的UDP Listener

虽然在用户态能够实现效果,可是编程模型并不太好用,为了建立一个socket,你不得不先去recvfrom一下数据,好获得对端的地址,虽然使用PEEK标志可让建立好child socket后再读一次,可是仔细想一想,最完全的方案仍是直接扩展内核,我基于3.9.6内核,对__udp4_lib_rcv这个UDP协议栈接收函数做了如下的修改:

int __udp4_lib_rcv(struct sk_buff *skb, struct udp_table *udptable,
                   int proto)
{
......................
        sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable);

        if (sk != NULL) {
                int ret;
#if 1
                // 这个UDP_LISTEN,经过setsockopt来设置
                if (sk->sk_state == UDP_LISTEN) {
                        // 若是是UDP的listener,建立一个数据socket
                        struct sock *newsk = inet_udp_clone_lock(sk, skb, GFP_ATOMIC);
                        if (newsk) {
                                struct inet_sock *newinet;
                                // 为这个数据传输socket根据skb来填充4元组信息
                                newinet               = inet_sk(newsk);
                                newinet->inet_daddr   = ip_hdr(skb)->saddr;
                                newinet->inet_rcv_saddr = ip_hdr(skb)->daddr;
                                newinet->inet_saddr           = ip_hdr(skb)->daddr;
                                rcu_assign_pointer(newinet->inet_opt, NULL);

                                newinet->mc_index     = inet_iif(skb);
                                newinet->mc_ttl       = ip_hdr(skb)->ttl;
                                newinet->rcv_tos      = ip_hdr(skb)->tos;
                                newinet->inet_id = 0xffffffff ^ jiffies;
                                inet_sk_rx_dst_set(newsk, skb);
                                // sock结构体新增csk变量,相似TCP的accept queue,可是为了简单,目前每一个Listen socket只能持有一个csk,即child sock。
                                sk->csk = newsk;

                                // 将新的数据传输socket排入全局的UDP socket hash表
                                if (newsk->sk_prot->get_port(newsk, newinet->inet_num)) {
                                        printk("[UDP listen] get port errorn");
                                        release_sock(newsk);
                                        err = -2;
                                        goto out_go;
                                }
                                ret = udp_queue_rcv_skb(newsk, skb);
                                // 唤醒epoll,让epoll返回UDP Listener
                                sk->sk_data_ready(sk, 0);
                                sock_put(newsk);
                        } else {
                                printk("[UDP listen] create new errorn");
                                sock_put(sk);
                                return -1;
                        }
out_go:
                        sock_put(sk);
                        if (ret > 0)
                                return -ret;
                        return 0;
                }
#endif
                ret = udp_queue_rcv_skb(sk, skb);
                sock_put(sk);
......................
}

我只是测试,所以并无扩展UDP的accept方法,只是简单的用getsocketopt来得到这个新的socket描述符并为task安装该文件描述符,setsockopt能够设置一个UDP socket为listener。这样用户态的编程模型就很简单了。

使用新的Listen UDP来改造OpenUOM

有必要重构一下OpenUOM了,现现在它的逻辑变成了:

listen = 1;
listener = socket(PF_INET, SOCK_DGRAM, 0);
setsockopt(new_sd, SOL_SOCKET, SO_UDPLISTEN, &listen,sizeof(listen));

/* 加入侦听socket */
context.sd = listener;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);

/* 加入TUN网卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);

while(1) {
    nfds = epoll_wait(kdpfd, events, 10000, -1);
    for (n = 0; n < nfds; ++n) {
        if (events[n].data.ptr == context) {
            getsockopt(context.sd, SOL_SOCKET, &newsock_info....);
            child_sd = newsock_info.sd;
            multi_instance *mi = create_mi(child_sd, newsock_info.remote_addr, ...);
            entry.ptr = mi;
            entry.type = SOCKET;
            new_ev.events = EPOLLIN;
            new_ev.data.ptr = entry;
            epoll_ctl(kdpfd, EPOLL_CTL_ADD, child_sd, &new_ev);
            // 这是UDP,内核除了通知Listener以外,还会将数据排入child_sd,所以须要去读取,能够参考TCP的Fastopen逻辑
            data = recvfrom(child_sd, ....);
            ....
        } else if (events[n].data.ptr.type == SOCKET){
            multi_instance *mi = events[n].data.ptr;
            data = read_from_socket(mi);
            // 这里简化了处理,由于并非每个数据包都是须要加密解密的,还有控制通道的包
            decrypt(mi, data);
            write_to_tun(data);
        } else {
            tun *tun = events[n].data.ptr.ptr;
            packet = read_from_tun(tun);
            lock(mi_hashtable);
            multi_instance *mi = lookup_multi_instance_from(packet);
            unlock(mi_hashtable);
            encrypt(packet);
            write_to_socket(packet, mi);
        }
    }
    ...
}

除了把accept改为了getsockopt以外,别的几乎和TCP的OpenUOM彻底一致了。如此一来,2014年改造的OpenUOM多线程版本就完美了,用户态根本不须要再使用recvfrom返回的address信息来定位multi_instance了,一个multi_instance惟一和一个socket绑定,而每个socket都由epoll来管理,大大下降了用户态查找multi_instance的开销,同时也避免了锁定。

相关文章
相关标签/搜索