传送文件描述符是高并发网络服务编程的一种常见实现方式。Nebula 高性能通用网络框架即采用了UNIX域套接字传递文件描述符设计和实现。本文详细说明一下传送文件描述符的应用。html
开发一个服务器程序,有较多的的程序设计范式可供选择,不一样范式有其自身的特色和实用范围,明了不一样范式的特性有助于咱们服务器程序的开发。常见的TCP服务器程序设计范式有如下几种:linux
当系统负载较轻时,传统的并发服务器程序模型就够了。相对于传统的每一个客户一次fork设计,预先建立一个进程池或线程池能够减小进程控制CPU时间,大约可减小10倍以上。nginx
某些实现容许多个子进程或线程阻塞在accept上,然而在另外一些实现中,咱们必须使用文件锁、线程互斥锁或其余类型的锁来确保每次只有一个子进程或线程在accept。c++
通常来说,全部子进程或线程都调用accept要比父进程或主线程调用accept后将描述字传递个子进程或线程来得快且简单。git
Nebula框架是预先建立多进程,由Manager主进程accept后传递文件描述符到Worker子进程的服务模型(Nebula进程模型)。为何不采用像nginx那样多线程由子线程使用互斥锁上锁保护accept的服务模型?并且这种服务模型的实现比传递文件描述符来得还简单一些。github
Nebula框架采用无锁设计,进程以前彻底不共享数据,不存在须要互斥访问的地方。没错,会存在数据多副本问题,但这些多副本每每只是些配置数据,占用不了太大内存,与加锁解锁带来的代码复杂度及锁开销相比这点内存代价更划算也更简单。编程
同一个Nebula服务的工做进程间不相互通讯,采用进程和线程并没有太大差别,之因此采用进程而不是线程的最重要考虑是Nebula是出于稳定性和容错性考虑。Nebula是通用框架,彻底业务无关,业务都是经过动态加载的方式或经过将Nebula连接进业务Server的方式来实现。Nebula框架没法预知业务代码的质量,但能够保证在服务因业务代码致使coredump或其余状况时,框架能够实时监控到并马上拉起服务进程,最大程度保障服务可用性。数组
决定Nebula采用传递文件描述符方式的最重要一点是:Nebula定位是高性能分布式服务集群解决方案的基础通讯框架,其设计更多要为构建分布式服务集群而考虑。集群不一样服务节点之间经过TCP通讯,而全部逻辑都是Worker进程负责,这意味着节点之间通讯须要指定到Worker进程,而若是采用子进程竞争accept的方式没法保证指定的子进程得到资源,那么第一个通讯数据包将会路由错误。采用传递文件描述符方式能够很完美地解决这个问题,并且传递文件描述符也很是高效。服务器
文件描述符传递经过调用sendmsg()函数发送,调用recvmsg()函数接收:网络
#include <sys/types.h> #include <sys/socket.h> ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags); ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
这两个函数与sendto和recvfrom函数类似,只不过能够传输更复杂的数据结构,不只能够传输通常数据,还能够传输额外的数据,即文件描述符。下面来看结构体msghdr及其相关结构体 :
struct msghdr { void *msg_name; /* optional address */ socklen_t msg_namelen; /* size of address */ struct iovec *msg_iov; /* scatter/gather array */ size_t msg_iovlen; /* # elements in msg_iov */ void *msg_control; /* ancillary data, see below */ size_t msg_controllen; /* ancillary data buffer len */ int msg_flags; /* flags on received message */ }; /* iovec结构体 */ struct iovec { void *iov_base; /* Starting address */ size_t iov_len; /* Number of bytes to transfer */ }; /* cmsghdr结构体 */ struct cmsghdr { socklen_t cmsg_len; /* data byte count, including header */ int cmsg_level; /* originating protocol */ int cmsg_type; /* protocol-specific type */ /* followed by unsigned char cmsg_data[]; */ };
msghdr结构成员说明:
为了对齐,可能存在一些填充字节,跟不一样系统的实现有关控制信息的数据部分,是直接存储在cmsghdr结构体的cmsg_type以后的。但中间可能有一些因为对齐产生的填充字节,因为这些填充数据的存在,对于这些控制数据的访问,必须使用Linux提供的一些专用宏来完成:
#include <sys/socket.h> /* 返回msgh所指向的msghdr类型的缓冲区中的第一个cmsghdr结构体的指针。*/ struct cmsghdr *CMSG_FIRSTHDR(struct msghdr *msgh); /* 返回传入的cmsghdr类型的指针的下一个cmsghdr结构体的指针。 */ struct cmsghdr *CMSG_NXTHDR(struct msghdr *msgh, struct cmsghdr *cmsg); /* 根据传入的length大小,返回一个包含了添加对齐做用的填充数据后的大小。 */ size_t CMSG_ALIGN(size_t length); /* 传入的参数length指的是一个控制信息元素(即一个cmsghdr结构体)后面数据部分的字节数,返回的是这个控制信息的总的字节数,即包含了头部(即cmsghdr各成员)、数据部分和填充数据的总和。*/ size_t CMSG_SPACE(size_t length); /* 根据传入的cmsghdr指针参数,返回其后面数据部分的指针。*/ size_t CMSG_LEN(size_t length); /* 传入的参数是一个控制信息中的数据部分的大小,返回的是这个根据这个数据部分大小,须要配置的cmsghdr结构体中cmsg_len成员的值。这个大小将为对齐添加的填充数据也包含在内。*/ unsigned char *CMSG_DATA(struct cmsghdr *cmsg);
sendmsg提供了能够传递控制信息的功能,要实现的传递描述符这一功能必需要用到这个控制信息。在msghdr变量的cmsghdr成员中,由控制头cmsg_level和cmsg_type来设置传递文件描述符这一属性,并将要传递的文件描述符做为数据部分,保存在cmsghdr变量的后面。这样就能够实现传递文件描述符这一功能,这种状况是不须要使用msg_iov来传递数据的。
具体地说,为msghdr的成员msg_control分配一个cmsghdr的空间,将该cmsghdr结构的cmsg_level设置为SOL_SOCKET,cmsg_type设置为SCM_RIGHTS,并将要传递的文件描述符做为数据部分,调用sendmsg便可。其中SCM表示socket-level control message,SCM_RIGHTS表示咱们要传递访问权限。
跟发送部分同样,为控制信息配置好属性,并在其后分配一个文件描述符的数据部分后,在成功调用recvmsg后,控制信息的数据部分就是在接收进程中的新的文件描述符了,接收进程可直接对该文件描述符进行操做。
文件描述符传递并非将文件描述符数字传递,而是文件描述符对应数据结构。在主进程accept的到的文件描述符7传递到子进程后文件描述符有多是7,更有多是7之外的其余数值,但不管是什么数值并不重要,重要的是传递以后的链接跟传递以前的链接是同一个链接。
一般在完成文件描述符传递后,接收进程接管文件描述符,发送进程则应调用close关闭已传递的文件描述符。发送进程关闭描述符并不形成关闭该文件或设备,由于该描述符对应的文件仍被视为由接收者进程打开(即便接收进程还没有接收到该描述符)。
文件描述符传递可经由基于STREAMS的管道,也可经由UNIX域套接字。两种方式在《UNIX网络编程》中均有描述,Nebula采用的UNIX域套接字传递文件描述符。
建立用于传递文件描述符的UNIX域套接字用到socketpair函数:
#include <sys/types.h> #include <sys/socket.h> int socketpair(int d, int type, int protocol, int sv[2]);
传入的参数sv为一个整型数组,有两个元素。当调用成功后,这个数组的两个元素即为2个文件描述符。一对链接起来的Unix匿名域套接字就创建起来了,它们就像一个全双工的管道,每一端都既可读也可写。
Nebula框架的文件描述符属于SocketChannel的基本属性,文件描述符传递方法是SocketChannel的静态方法。
文件描述符传递方法声明:
static int SendChannelFd(int iSocketFd, int iSendFd, int iCodecType, std::shared_ptr<NetLogger> pLogger); static int RecvChannelFd(int iSocketFd, int& iRecvFd, int& iCodecType, std::shared_ptr<NetLogger> pLogger);
文件描述符发送方法实现:
/** * @brief 发送文件描述符 * @param iSocketFd 由socketpair()建立的UNIX域套接字,用于传递文件描述符 * @param iSendFd 待发送的文件描述符 * @param iCodecType 通讯通道编解码类型 * @param pLogger 日志类指针 * @return errno 错误码 */ int SocketChannel::SendChannelFd(int iSocketFd, int iSendFd, int iCodecType, std::shared_ptr<NetLogger> pLogger) { ssize_t n; struct iovec iov[1]; struct msghdr msg; tagChannelCtx stCh; int iError = 0; stCh.iFd = iSendFd; stCh.iCodecType = iCodecType; union { struct cmsghdr cm; char space[CMSG_SPACE(sizeof(int))]; } cmsg; if (stCh.iFd == -1) { msg.msg_control = NULL; msg.msg_controllen = 0; } else { msg.msg_control = (caddr_t) &cmsg; msg.msg_controllen = sizeof(cmsg); memset(&cmsg, 0, sizeof(cmsg)); cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); cmsg.cm.cmsg_level = SOL_SOCKET; cmsg.cm.cmsg_type = SCM_RIGHTS; *(int *) CMSG_DATA(&cmsg.cm) = stCh.iFd; } msg.msg_flags = 0; iov[0].iov_base = (char*)&stCh; iov[0].iov_len = sizeof(tagChannelCtx); msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; n = sendmsg(iSocketFd, &msg, 0); if (n == -1) { pLogger->WriteLog(neb::Logger::ERROR, __FILE__, __LINE__, __FUNCTION__, "sendmsg() failed, errno %d", errno); iError = (errno == 0) ? ERR_TRANSFER_FD : errno; return(iError); } return(ERR_OK); }
文件描述符接收方法实现:
/** * @brief 接收文件描述符 * @param iSocketFd 由socketpair()建立的UNIX域套接字,用于传递文件描述符 * @param iRecvFd 接收到的文件描述符 * @param iCodecType 接收到的通讯通道编解码类型 * @param pLogger 日志类指针 * @return errno 错误码 */ int SocketChannel::RecvChannelFd(int iSocketFd, int& iRecvFd, int& iCodecType, std::shared_ptr<NetLogger> pLogger) { ssize_t n; struct iovec iov[1]; struct msghdr msg; tagChannelCtx stCh; int iError = 0; union { struct cmsghdr cm; char space[CMSG_SPACE(sizeof(int))]; } cmsg; iov[0].iov_base = (char*)&stCh; iov[0].iov_len = sizeof(tagChannelCtx); msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; msg.msg_control = (caddr_t) &cmsg; msg.msg_controllen = sizeof(cmsg); n = recvmsg(iSocketFd, &msg, 0); if (n == -1) { pLogger->WriteLog(neb::Logger::ERROR, __FILE__, __LINE__, __FUNCTION__, "recvmsg() failed, errno %d", errno); iError = (errno == 0) ? ERR_TRANSFER_FD : errno; return(iError); } if (n == 0) { pLogger->WriteLog(neb::Logger::ERROR, __FILE__, __LINE__, __FUNCTION__, "recvmsg() return zero, errno %d", errno); iError = (errno == 0) ? ERR_TRANSFER_FD : errno; return(ERR_CHANNEL_EOF); } if ((size_t) n < sizeof(tagChannelCtx)) { pLogger->WriteLog(neb::Logger::ERROR, __FILE__, __LINE__, __FUNCTION__, "rrecvmsg() returned not enough data: %z, errno %d", n, errno); iError = (errno == 0) ? ERR_TRANSFER_FD : errno; return(iError); } if (cmsg.cm.cmsg_len < (socklen_t) CMSG_LEN(sizeof(int))) { pLogger->WriteLog(neb::Logger::ERROR, __FILE__, __LINE__, __FUNCTION__, "recvmsg() returned too small ancillary data"); iError = (errno == 0) ? ERR_TRANSFER_FD : errno; return(iError); } if (cmsg.cm.cmsg_level != SOL_SOCKET || cmsg.cm.cmsg_type != SCM_RIGHTS) { pLogger->WriteLog(neb::Logger::ERROR, __FILE__, __LINE__, __FUNCTION__, "recvmsg() returned invalid ancillary data level %d or type %d", cmsg.cm.cmsg_level, cmsg.cm.cmsg_type); iError = (errno == 0) ? ERR_TRANSFER_FD : errno; return(iError); } stCh.iFd = *(int *) CMSG_DATA(&cmsg.cm); if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) { pLogger->WriteLog(neb::Logger::ERROR, __FILE__, __LINE__, __FUNCTION__, "recvmsg() truncated data"); iError = (errno == 0) ? ERR_TRANSFER_FD : errno; return(iError); } iRecvFd = stCh.iFd; iCodecType = stCh.iCodecType; return(ERR_OK); }
Manager进程的void Manager::CreateWorker()方法建立用于传递文件描述符的UNIX域套接字:
int iControlFds[2]; int iDataFds[2]; if (socketpair(PF_UNIX, SOCK_STREAM, 0, iControlFds) < 0) { LOG4_ERROR("error %d: %s", errno, strerror_r(errno, m_szErrBuff, 1024)); } if (socketpair(PF_UNIX, SOCK_STREAM, 0, iDataFds) < 0) { LOG4_ERROR("error %d: %s", errno, strerror_r(errno, m_szErrBuff, 1024)); }
Manager进程发送文件描述符:
int iCodec = m_stManagerInfo.eCodec; // 将编解码方式和文件描述符一同发送给Worker进程 int iErrno = SocketChannel::SendChannelFd(worker_pid_fd.second, iAcceptFd, iCodec, m_pLogger); if (iErrno == 0) { AddWorkerLoad(worker_pid_fd.first); } else { LOG4_ERROR("error %d: %s", iErrno, strerror_r(iErrno, m_szErrBuff, 1024)); } close(iAcceptFd); // 发送完毕,关闭文件描述符
Worker进程接收文件描述符:
int iAcceptFd = -1; int iCodec = 0; // 这里的编解码方式在RecvChannelFd方法中得到 int iErrno = SocketChannel::RecvChannelFd(m_stWorkerInfo.iManagerDataFd, iAcceptFd, iCodec, m_pLogger);
至此,Nebula框架的文件描述符传递分享完毕,下面再看看nginx中的文件描述符传递实现。
Nginx的文件描述符传递代码在os/unix/ngx_channel.c文件中。
nginx中发送文件描述符代码:
ngx_int_t ngx_write_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log) { ssize_t n; ngx_err_t err; struct iovec iov[1]; struct msghdr msg; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) union { struct cmsghdr cm; char space[CMSG_SPACE(sizeof(int))]; } cmsg; if (ch->fd == -1) { msg.msg_control = NULL; msg.msg_controllen = 0; } else { msg.msg_control = (caddr_t) &cmsg; msg.msg_controllen = sizeof(cmsg); ngx_memzero(&cmsg, sizeof(cmsg)); cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); cmsg.cm.cmsg_level = SOL_SOCKET; cmsg.cm.cmsg_type = SCM_RIGHTS; /* * We have to use ngx_memcpy() instead of simple * *(int *) CMSG_DATA(&cmsg.cm) = ch->fd; * because some gcc 4.4 with -O2/3/s optimization issues the warning: * dereferencing type-punned pointer will break strict-aliasing rules * * Fortunately, gcc with -O1 compiles this ngx_memcpy() * in the same simple assignment as in the code above */ ngx_memcpy(CMSG_DATA(&cmsg.cm), &ch->fd, sizeof(int)); } msg.msg_flags = 0; #else if (ch->fd == -1) { msg.msg_accrights = NULL; msg.msg_accrightslen = 0; } else { msg.msg_accrights = (caddr_t) &ch->fd; msg.msg_accrightslen = sizeof(int); } #endif iov[0].iov_base = (char *) ch; iov[0].iov_len = size; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; n = sendmsg(s, &msg, 0); if (n == -1) { err = ngx_errno; if (err == NGX_EAGAIN) { return NGX_AGAIN; } ngx_log_error(NGX_LOG_ALERT, log, err, "sendmsg() failed"); return NGX_ERROR; } return NGX_OK; }
nginx中接收文件描述符代码:
ngx_int_t ngx_read_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log) { ssize_t n; ngx_err_t err; struct iovec iov[1]; struct msghdr msg; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) union { struct cmsghdr cm; char space[CMSG_SPACE(sizeof(int))]; } cmsg; #else int fd; #endif iov[0].iov_base = (char *) ch; iov[0].iov_len = size; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) msg.msg_control = (caddr_t) &cmsg; msg.msg_controllen = sizeof(cmsg); #else msg.msg_accrights = (caddr_t) &fd; msg.msg_accrightslen = sizeof(int); #endif n = recvmsg(s, &msg, 0); if (n == -1) { err = ngx_errno; if (err == NGX_EAGAIN) { return NGX_AGAIN; } ngx_log_error(NGX_LOG_ALERT, log, err, "recvmsg() failed"); return NGX_ERROR; } if (n == 0) { ngx_log_debug0(NGX_LOG_DEBUG_CORE, log, 0, "recvmsg() returned zero"); return NGX_ERROR; } if ((size_t) n < sizeof(ngx_channel_t)) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() returned not enough data: %z", n); return NGX_ERROR; } #if (NGX_HAVE_MSGHDR_MSG_CONTROL) if (ch->command == NGX_CMD_OPEN_CHANNEL) { if (cmsg.cm.cmsg_len < (socklen_t) CMSG_LEN(sizeof(int))) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() returned too small ancillary data"); return NGX_ERROR; } if (cmsg.cm.cmsg_level != SOL_SOCKET || cmsg.cm.cmsg_type != SCM_RIGHTS) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() returned invalid ancillary data " "level %d or type %d", cmsg.cm.cmsg_level, cmsg.cm.cmsg_type); return NGX_ERROR; } /* ch->fd = *(int *) CMSG_DATA(&cmsg.cm); */ ngx_memcpy(&ch->fd, CMSG_DATA(&cmsg.cm), sizeof(int)); } if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() truncated data"); } #else if (ch->command == NGX_CMD_OPEN_CHANNEL) { if (msg.msg_accrightslen != sizeof(int)) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() returned no ancillary data"); return NGX_ERROR; } ch->fd = fd; } #endif return n; }
Nebula框架系列技术分享 之 《经过UNIX域套接字传递文件描述符》。 若是以为这篇文章对你有用,若是以为Nebula框架还能够,帮忙到Nebula的Github或码云给个star,谢谢。Nebula不只是一个框架,还提供了一系列基于这个框架的应用,目标是打造一个高性能分布式服务集群解决方案。
参考资料: