redis的通讯模块封装得很是简单易用,能够直接用到本身项目中,学习下也是颇有价值的。git
本文基于redis源码4.0.1写成,redis源码下载:github.com/antirez/red…github
redis的网络通讯模块由8个文件构成 redis
文件 | 做用 |
---|---|
ae.c | 统一epoll、evport、kqueue、select网络事件处理接口, 函数实现 |
ae.h | 统一epoll、evport、kqueue、select网络事件处理接口,函数原型,共享结构体定义 |
ae_epoll.c | 封装epoll网络事件处理库到统一的接口 |
ae_evport.c | 封装evport网络事件处理库到统一的接口 |
ae_kqueue.c | 封装kqueue网络事件处理库到统一的接口 |
ae_select.c | 封装select网络事件处理库到统一的接口 |
被统一的网络事件处理接口以下,参考:ae_epoll.c, ae_evport.c, ae_kqueue.c, ae_select.cshell
接口 | 做用 |
---|---|
aeApiState | 底层网络库须要的fd、事件等数据共享结构体 |
aeApiCreate | 建立网络句柄,好比epoll句柄,在这个基础上才能进行网络事件的监听处理 |
aeApiResize | 修改网络库存放事件的容器大小,就是修改aeApiState结构体的events数组的大小 |
aeApiFree | 删除aeApiState这个共享结构体 |
aeApiAddEvent | 将网络fd的读写操做交给网络库进行处理,好比给这个epoll进行处理 |
aeApiDelEvent | 从网络操做库中删除对某个fd的监听,通常服务器往客户端写完数据后,主动断开客户端链接时会使用 |
aeApiPoll | 轮询获取网络正在发生io读写事件的事件 |
aeApiName | 获取底层网络库的名字,好比epool,kqueue等 |
参考 ae.h, ae.c,封装通用的结构体和函数api
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
复制代码
这个结构体封装了网络数据读写事件的处理函数原型,和客户端传过来的数据。数组
aeFileProc是处理函数的原型的宏定义,宏定义参考以下:服务器
typedef void aeFileProc( struct aeEventLoop *eventLoop, int fd, void *clientData, int mask );
复制代码
实际使用时,网络读写事件的函数处理须要自行编写网络
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
复制代码
这个定时事件是redis用来处理后台定时任务使用的,好比处理客户端链接超时,服务端断开操做socket
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
复制代码
就是正在等待读、正在等待写的网络fd,通常以一个数组出现tcp
mask字段表示是等待读,仍是等待写,对应的值以下
#define AE_READABLE 1
#define AE_WRITABLE 2
复制代码
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
复制代码
核心字段以下 * 事件列表*events, index表示fd, 表示网络fd的读写事件发生时,执行的回调函数 * *fired 正在等待读和写的fd列表,经过轮询获得,好比epoll经过epoll_wait获得的 * beforesleep 在epoll_wait前执行 * aftersleep 在epoll_wait后执行
主要是对aeEventLoop结构体进行操做,这里拿几个核心的函数举例
这个函数主要是建立aeEventLoop结构体,并根据setsize参数给events, fired分配内容空间 而后再调用aeApiCreate驱动底层的网络库,好比epoll的epoll_create(),而后保存到aeEventLoop的apidata属性上
主要是启动无限循环、轮询处理活跃的fd,若是epoll,底层则调用epoll_wait获取可读写的fd:ae.c::aeMain() -> ae.c::aeProcessEvents() -> ae_epoll.c::aeApiPool() -> ae_epoll.c::epoll_wait()
注册网络fd到底层网络库上,并注册当读写事件发生时,须要执行的业务回调函数 拿epoll举例,调用到epoll通过的步骤:ae.c::aeCreateFileEvent() -> ae_epoll.c::aeApiAddEvent() -> epoll_ctl()
主要是封装socket操做,屏蔽系统底层socket操做的差别性,提供更好用的api,好比建立tcp server等
这里就拿建立tcp server来看看它的流程
anet.c::anetTcpServer() -> anet.c::_anetTcpServer() -> <sys/socket.h>::socket() -> anet.c::anetListen() -> <sys/socket.h>::listen()
核心代码就是调用系统socket库的listen函数创建起了一个tcp的server
这里使用ae.h和anet.h来建立一个简单的tcp服务器,并把给客户端传回一个"Hello World"
代码以下:
#include <stdio.h>
#include <zconf.h>
#include "../src/ae.h"
#include "../src/anet.h"
char myerr[ANET_ERR_LEN] = {0};
void acceptFd(struct aeEventLoop *eventLoop, int fd, void *clientdata, int mask) {
char myerr[ANET_ERR_LEN] = {0};
printf("获取客户端链接的fd.\n");
char ip[20] = {0};
int port = 0;
int clientfd = anetTcpAccept(myerr, fd, ip, sizeof(ip), &port);
if (clientfd == AE_ERR) {
printf("获取客户端链接的fd异常!! \n");
return;
}
printf("客户端ip %s port %d \n", ip, port);
int ret = anetNonBlock(myerr, clientfd);
if (ret == ANET_OK) {
printf("客户端事件非阻塞处理设置成功\n\n");
}
anetEnableTcpNoDelay(myerr, clientfd);
write(clientfd,"Hello Client!\n",14);
}
int main() {
aeEventLoop *eventLoop = aeCreateEventLoop(1024);
if(!eventLoop){
return 1;
}
int fd = anetTcpServer(myerr, 8080, "0.0.0.0", 511);
if (fd != ANET_ERR) {
anetNonBlock(NULL, fd);
if (aeCreateFileEvent(eventLoop, fd, AE_READABLE, acceptFd, NULL)) {
printf("注册tcp服务器接收客户端链接的事件处理函数异常");
}
}
printf("服务器在 0.0.0.0:8080 端口监听了! \n");
aeMain(eventLoop);
aeDeleteEventLoop(eventLoop);
return 0;
}
复制代码
而后进行编译并启动,编译可使用gcc,效果以下
gcc -o myserver src/ae.c src/anet.c src/zmalloc.c myserver.c
复制代码
可使用telnet进行测试
Mac OS 10.14.5下若是没法找到c的标准库,须要安装这个文件:/Library/Developer/CommandLineTools/Packages/macOS_SDK_headers_for_macOS_10.14.pkg