twitter公司redis&memcached中间件twemproxy源码分析(一)

twitter公司redis&memcached中间件twemproxy源码分析(一)

twemproxy是redis和memcached链接池中间件html

github项目地址:github.com/twitter/twe…node

项目简介参考:github.com/twitter/twe…linux

文档参考:github.com/twitter/twe…git

核心流程

主流程就是启动了一个事件循环,全部的逻辑经过事件出发调用回调函数执行github

消息流转流程

消息会在三种角色的fd中进行流转:client, server, proxyredis

角色 做用
client 表明应用程序端接入twemproxy的fd
proxy 表明twemproxy服务器的fd,等待应用程序端接入
server 表明twemproxy连接redis/memcache的fd,proxy fd处理程序会把从client fd读到的数据转到server fd中

核心流程以下: 算法

事件系统:epoll/kqueue/evport封装

通常linux使用epoll, mac使用kqueue,Sun Solaris使用evportapi

twemproxy为了统一底层api调用,对上面的三种事件处理api进行了接口的统一: event/nc_event.h,核心api参考:数组

函数 做用
event_base_create() 建立事件循环管理fd
event_add_in() 将一个fd的读事件归入事件管理器的管理中
event_add_out() 将一个fd的读写事件归入事件管理器的管理中
event_add_conn() 将一个fd的读写事件归入事件管理器的管理中
event_wait() 从事件管理器中获取准备好读/写的fd列表,并调用cb进行处理

核心结构体event_base参考: src/event/nc_event.h服务器

struct event_base {
    int                ep;      /* epoll descriptor */
    struct epoll_event *event;  /* event[] - events that were triggered int nevent; /* # event */
    event_cb_t         cb;      /* event callback */
};
复制代码
字段 做用
ep 事件管理器fd
struct epoll_event 这个事件管理器能够处理的事件类型(读、写、hup信号)
int nevent 最大取多少个事件
event_cb_t cb 事件处理回调函数

twemproxy中大量使用事件回调来驱动数据流转,上面的event_cb_t就是一个回调函数

typedef int (*event_cb_t)(void *, uint32_t);
复制代码

核心数据结构和算法

array 变长数组

struct array {
    uint32_t nelem;  /* # element */
    void     *elem;  /* element */
    size_t   size;   /* element size */
    uint32_t nalloc; /* # allocated element */
};
复制代码

经过调整nelem和*elem指针内容实现动态数组

string 变长字符串

struct string {
    uint32_t len;   /* string length */
    uint8_t  *data; /* string data */
};
复制代码

红黑树

struct rbnode {
    struct rbnode *left;     /* left link */
    struct rbnode *right;    /* right link */
    struct rbnode *parent;   /* parent link */
    int64_t       key;       /* key for ordering */
    void          *data;     /* opaque data */
    uint8_t       color;     /* red | black */
};
复制代码

一致性hash

参考:src/hashkit/nc_ketama.c

队列

twemproxy使用的队列封装支持数种不一样类型的队列

参考:src/nc_queue.h,这个队列文件来自freebsd linx内核:github.com/freebsd/fre…

配置文件解析

twemproxy使用yaml格式的配置文件,使用libyaml库解析这个yaml格式的配置文件,参考:github.com/yaml/libyam…

twemproxy配置文件解析逻辑参考:src/nc_conf.h src/nc_conf.c

协议抽象接口

twemproxy支持memcache, redis两种协议,抽象成接口能够把这个协议从twemproxy主流程中解耦、这样也利于添加新的协议支持

memcached

void memcache_parse_req(struct msg *r);
void memcache_parse_rsp(struct msg *r);
bool memcache_failure(struct msg *r);
void memcache_pre_coalesce(struct msg *r);
void memcache_post_coalesce(struct msg *r);
rstatus_t memcache_add_auth(struct context *ctx, struct conn *c_conn, struct conn *s_conn);
rstatus_t memcache_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq);
rstatus_t memcache_reply(struct msg *r);
void memcache_post_connect(struct context *ctx, struct conn *conn, struct server *server);
void memcache_swallow_msg(struct conn *conn, struct msg *pmsg, struct msg *msg);
复制代码

redis

void redis_parse_req(struct msg *r);
void redis_parse_rsp(struct msg *r);
bool redis_failure(struct msg *r);
void redis_pre_coalesce(struct msg *r);
void redis_post_coalesce(struct msg *r);
rstatus_t redis_add_auth(struct context *ctx, struct conn *c_conn, struct conn *s_conn);
rstatus_t redis_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq);
rstatus_t redis_reply(struct msg *r);
void redis_post_connect(struct context *ctx, struct conn *conn, struct server *server);
void redis_swallow_msg(struct conn *conn, struct msg *pmsg, struct msg *msg);
复制代码

能够看到redis和memcached协议被统一封装成了两组对等的接口,对应实现参考:nc_memcache.c nc_redis.c

监控系统

twemproxy使用单独的线程处理监控数据获取请求,参考src/nc_stats.c::stats_start_aggregator

static rstatus_t stats_start_aggregator(struct stats *st) {
    rstatus_t status;
    // error handle...
    status = stats_listen(st);
    // error handle...

    status = pthread_create(&st->tid, NULL, stats_loop, st);
    // error handle...
    return NC_OK;
}
复制代码

这个监控线程和主线程间使用结构体共享监控数据,主线程往这个结构体中写入实时监控数据,监控线程接到客户端获取监控数据的请求后,从这个结构体中读取监控数据输出给客户端,核心结构体参考:src/nc_stats.h

typedef enum stats_type {
    // ...
} stats_type_t;

struct stats_metric {
    // ...
};

struct stats_server {
    // ...
};

struct stats_pool {
    // ...
};

struct stats_buffer {
    // ...
};

struct stats {
    // ...
};
复制代码

使用命令curl -s 127.0.0.1:22222 | jq .能够查看实时的监控信息

信号处理

参考:src/nc_signal.h src/nc_signal.c

static struct signal signals[] = {
    { SIGUSR1, "SIGUSR1", 0,                 signal_handler },
    { SIGUSR2, "SIGUSR2", 0,                 signal_handler },
    { SIGTTIN, "SIGTTIN", 0,                 signal_handler },
    { SIGTTOU, "SIGTTOU", 0,                 signal_handler },
    { SIGHUP,  "SIGHUP",  0,                 signal_handler },
    { SIGINT,  "SIGINT",  0,                 signal_handler },
    { SIGSEGV, "SIGSEGV", (int)SA_RESETHAND, signal_handler },
    { SIGPIPE, "SIGPIPE", 0,                 SIG_IGN },
    { 0,        NULL,     0,                 NULL }
};
复制代码

能够看到twemproxy对多个信号进行回调:signal_handler

void signal_handler(int signo) {
    //...

    switch (signo) {
    case SIGUSR1:
        break;

    case SIGUSR2:
        break;

    case SIGTTIN:
        actionstr = ", up logging level";
        action = log_level_up;
        break;

    case SIGTTOU:
        actionstr = ", down logging level";
        action = log_level_down;
        break;

    case SIGHUP:
        actionstr = ", reopening log file";
        action = log_reopen;
        break;

    case SIGINT:
        done = true;
        actionstr = ", exiting";
        break;

    case SIGSEGV:
        log_stacktrace();
        actionstr = ", core dumping";
        raise(SIGSEGV);
        break;

    default:
        NOT_REACHED();
    }

    log_safe("signal %d (%s) received%s", signo, sig->signame, actionstr);

    if (action != NULL) {
        action();
    }

    if (done) {
        exit(1);
    }
}
复制代码

能够看到使用SIGTTIN/信号增长日志输出级别,SIGTTOU下降日志输入级别, SIGHUP从新打开日志文件,SIGINT退出程序,SIGSEGV输出内存core dump文件

一些注意的点

本文基于twemproxy 0.4.1写成

尽管codis目前已经成为redis中间件的事实选择,可是twemproxy做为第一款memcache&redis中间件,仍具备比较大学习价值,而且支持memcache

参考资料

  1. man7.org/linux/man-p…
  2. blog.just4fun.site/command-too…
  3. unix.stackexchange.com/questions/1…
  4. www.jianshu.com/p/83ac498e1…
相关文章
相关标签/搜索