Mosquitto 是一个IBM 开源pub/sub订阅发布协议 MQTT 的一个单机版实现(目前也只有单机版),MQTT主打轻便,比较适用于移动设备等上面,花费流量少,解析代价低。相对于XMPP等来讲,简单许多。php
MQTT采用二进制协议,而不是XMPP的XML协议,因此通常消息甚至只须要花费2个字节的大小就能够交换信息了,对于移动开发比较有优点。html
IBM虽然开源了其MQTT消息协议,可是却没有开源其RSMB服务端程序,不过还好目前有比较稳定的实现可用,本文的Mosquitto是其中比较活跃的实现之一,具体在 这里 有目前的实现列表可供选择。web
趁着大脑尚未进入睡眠状态记录一下刚才看代码学到的东西。我下载的版本是1.2.2版,在 这里 能够找到 下载连接 。redis
关于 MQTT 3.1协议 自己比较简单,42页的PDF介绍完了,相比XMPP那长长的文档,谢天谢地了。因为刚看,因此不少细节都没有深刻进去,这里只是记录个大概,后续有时间慢慢补好坑吧。数组
整体来讲,mosquitto实现有以下几个特色:网络
总之,是一个比较简单单能够适用于通常的服务中提供pub/sub功能支持,但若是放到大量并发的系统中,能够优化的地方还有不少。关于mosquitto的性能,暂时没有找到官方的评测,不过在 邮件组里面找到的一些讨论 彷佛显示其性能上限为20W链接时在线的状态,固然具体取决于业务逻辑,交互是否不少等。不过这样的成绩仍是不错的。一台机器能够起多个实例的嘛。session
mosquitto.c文件main开头调用_mosquitto_net_init初始化SSL加密的库,而后调用mqtt3_config_init初始化配置的各个数据结构为默认值。配置文件的解析由mqtt3_config_parse_args牵头完成,具体配置文件解析就很少写了,fgets一行行的读取配置,而后设置到config全局变量中。其中包括对于监听地址等的读取。数据结构
而后保存pid进程号。mqtt3_db_open打开db文件多线程
int main(int argc, char *argv[]) { memset(&int_db, 0, sizeof(struct mosquitto_db)); _mosquitto_net_init(); mqtt3_config_init(&config); rc = mqtt3_config_parse_args(&config, argc, argv);//k: init && load config file, set struct members
配置读取完后,就能够打开监听端口了,使用mqtt3_socket_listen打开监听端口,并将SOCK套接字放在局部变量listensock里面,以便后面统一使用。并发
listener_max = -1; listensock_index = 0; for(i=0; i<config.listener_count; i++){ if(mqtt3_socket_listen(&config.listeners[i])){ _mosquitto_free(int_db.contexts); mqtt3_db_close(&int_db); if(config.pid_file){ remove(config.pid_file); } return 1; } listensock_count += config.listeners[i].sock_count; listensock = _mosquitto_realloc(listensock, sizeof(int)*listensock_count); if(!listensock){ _mosquitto_free(int_db.contexts); mqtt3_db_close(&int_db); if(config.pid_file){ remove(config.pid_file); } return 1; } for(j=0; j<config.listeners[i].sock_count; j++){ if(config.listeners[i].socks[j] == INVALID_SOCKET){ _mosquitto_free(int_db.contexts); mqtt3_db_close(&int_db); if(config.pid_file){ remove(config.pid_file); } return 1; } listensock[listensock_index] = config.listeners[i].socks[j]; if(listensock[listensock_index] > listener_max){ listener_max = listensock[listensock_index]; } listensock_index++; } }
关于mqtt3_socket_listen函数也比较经典,socket(),bind(), listen()的流程,不一样的是使用了新版的套接字信息获取函数getaddrinfo,该函数支持IPV4和IPV6,对应用层透明,不须要处理这些信息。
mqtt3_socket_listen(struct _mqtt3_listener *listener) { snprintf(service, 10, "%d", listener->port); memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = PF_UNSPEC; hints.ai_flags = AI_PASSIVE; hints.ai_socktype = SOCK_STREAM; //致使下面返回多个链表节的的因素可能有: //hostname参数关联的地址有多个,那么每一个返回一个节点;好比host为域名的时候,nslookup返回几个ip就有几个 //service参数指定的服务会吃多个套接字接口类型,那么也返回多个 if(getaddrinfo(listener->host, service, &hints, &ainfo)) return INVALID_SOCKET; listener->sock_count = 0; listener->socks = NULL; for(rp = ainfo; rp; rp = rp->ai_next){ //···· sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if(sock == -1){ strerror_r(errno, err, 256); _mosquitto_log_printf(NULL, MOSQ_LOG_WARNING, "Warning: %s", err); continue; } listener->sock_count++; listener->socks = _mosquitto_realloc(listener->socks, sizeof(int)*listener->sock_count); if(!listener->socks){ _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); return MOSQ_ERR_NOMEM; } listener->socks[listener->sock_count-1] = sock; /* Set non-blocking */ opt = fcntl(sock, F_GETFL, 0); if(bind(sock, rp->ai_addr, rp->ai_addrlen) == -1){ strerror_r(errno, err, 256); _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: %s", err); COMPAT_CLOSE(sock); return 1; } if(listen(sock, 100) == -1){ strerror_r(errno, err, 256); _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: %s", err); COMPAT_CLOSE(sock); return 1; } } freeaddrinfo(ainfo); }
打开监听套接字后,就能够进入消息事件循环,标准网络服务程序的必须过程。这个由main函数调用mosquitto_main_loop启动。mosquitto_main_loop函数主体也是一个大循环,在循环里面进行超时检测,事件处理,网络读写等等。因为使用poll模型,因此就须要在进行poll()等待以前准备须要监听的套接字数组列表pollfds,效率低的地方就在这里。
对于监听套接字,简单将其加入pollfds里面,注册POLLIN可读事件便可。若是对于其余跟客户端等的链接,就须要多作一步操做了。若是是桥接模式,进行相应的处理,这里暂时不介绍桥接模式,桥接模式是为了分布式部署加入的非标准协议,目前只有IBM rsmb和mosquitto实现了。
对于跟客户端的链接,mosquitto会在poll等待以前调用mqtt3_db_message_write尝试发送一次未发送的数据给对方,避免没必要要的等待可能。
int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock_count, int listener_max) { memset(pollfds, -1, sizeof(struct pollfd)*pollfd_count); pollfd_index = 0; for(i=0; i<listensock_count; i++){//注册监听sock的pollfd可读事件。也就是新链接事件 pollfds[pollfd_index].fd = listensock[i]; pollfds[pollfd_index].events = POLLIN; pollfds[pollfd_index].revents = 0; pollfd_index++; } time_count = 0; for(i=0; i<db->context_count; i++){//遍历每个客户端链接,尝试将其加入poll数组中 if(db->contexts[i]){ //···· /* Local bridges never time out in this fashion. */ if(!(db->contexts[i]->keepalive) || db->contexts[i]->bridge || now - db->contexts[i]->last_msg_in < (time_t)(db->contexts[i]->keepalive)*3/2){ //在进入poll等待以前,先尝试将未发送的数据发送出去 if(mqtt3_db_message_write(db->contexts[i]) == MOSQ_ERR_SUCCESS){ pollfds[pollfd_index].fd = db->contexts[i]->sock; pollfds[pollfd_index].events = POLLIN | POLLRDHUP; pollfds[pollfd_index].revents = 0; if(db->contexts[i]->current_out_packet){ pollfds[pollfd_index].events |= POLLOUT; } db->contexts[i]->pollfd_index = pollfd_index; pollfd_index++; }else{//尝试发送失败,链接出问题了 mqtt3_context_disconnect(db, db->contexts[i]); } }else{//超过1.5倍的时间,超时关闭链接 if(db->config->connection_messages == true){ _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", db->co ntexts[i]->id); } /* Client has exceeded keepalive*1.5 */ mqtt3_context_disconnect(db, db->contexts[i]);//关闭链接,清空数据,后续还能够用.sock=INVALID_SOCKET } }else{ #endif if(db->contexts[i]->clean_session == true){ //这个链接上次因为什么缘由,挂了,设置了clean session,因此这里直接完全清空其结构 mqtt3_context_cleanup(db, db->contexts[i], true); db->contexts[i] = NULL; }else if(db->config->persistent_client_expiration > 0){ //协议规定persistent_client的状态必须永久保存,这里避免链接永远没法删除,增长这个超时选项。 //也就是若是一个客户端断开链接一段时间了,那么咱们会主动干掉他 /* This is a persistent client, check to see if the * last time it connected was longer than * persistent_client_expiration seconds ago. If so, * expire it and clean up. */ if(now > db->contexts[i]->disconnect_t+db->config->persistent_client_expiration){ _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", db- >contexts[i]->id); #ifdef WITH_SYS_TREE g_clients_expired++; #endif db->contexts[i]->clean_session = true; mqtt3_context_cleanup(db, db->contexts[i], true); db->contexts[i] = NULL; } } #ifdef WITH_BRIDGE }
而后先使用mqtt3_db_message_timeout_check检测一下超时没有收到客户端回包确认的消息,mosquitto对于超时的消息处理,是会进行重发的。不过理论上,TCP是不须要重发的,具体见这里: MQTT消息推送协议应用数据包超时是否须要重发? 不过,因为mosquitto对于客户端断开链接的处理比较弱,链接从新创建后,使用的相关数据结构仍是相同的,所以重发其实也能够,只是这个时候的重发,其实是在一个链接上没有收到ACK回包,而后后续创建的新链接上进行重传。不是在一个链接上重传。可是这样其实也有不少弊端,好比客户端必须支持消息的持久化记录,不然容易双方对不上话的状况。
int mqtt3_db_message_timeout_check(struct mosquitto_db *db, unsigned int timeout) {//循环遍历每个链接的每一个消息msg,看起是否超时,若是超时,将消息状态改成上一个状态,从然后续触发重发 int i; time_t threshold; enum mosquitto_msg_state new_state = mosq_ms_invalid; struct mosquitto *context; struct mosquitto_client_msg *msg; threshold = mosquitto_time() - timeout; for(i=0; i<db->context_count; i++){//遍历每个链接, context = db->contexts[i]; if(!context) continue; msg = context->msgs; while(msg){//遍历每一个msg消息,看看其状态,若是超时了,那么从上一个消息开始重发.其实不须要重发http://chenzhenianqing.cn/ar ticles/977.html //固然若是这个是复用了以前断开过的链接,那就须要重发。可是,这个时候其实能够重发整个消息的。否则容易出问题,客户端难> 度大 if(msg->timestamp < threshold && msg->state != mosq_ms_queued){ switch(msg->state){ case mosq_ms_wait_for_puback: new_state = mosq_ms_publish_qos1; break; case mosq_ms_wait_for_pubrec: new_state = mosq_ms_publish_qos2; break; case mosq_ms_wait_for_pubrel: new_state = mosq_ms_send_pubrec; break; case mosq_ms_wait_for_pubcomp: new_state = mosq_ms_resend_pubrel; break; default: break; } if(new_state != mosq_ms_invalid){ msg->timestamp = mosquitto_time();//设置当前时间,下次依据来判断超时
超时提早检测完成后就能够进入poll等待了。等待完成后,对于有可读事件的链接,调用loop_handle_reads_writes进行事件读写处理,对于监听端口的事件,使用mqtt3_socket_accept去接受新链接。
loop_handle_reads_writes新事件处理函数比较简单,主体仍是循环判断可读可写事件,进行相应的处理。具体很少介绍了,须要关注的是因为是异步读写,因此须要记录上次读写状态,以便下次进入上下午继续读取数据。可写事件由_mosquitto_packet_write完成,可读事件由_mosquitto_packet_read完成。
新客户端链接的事件则由qtt3_socket_accept完成,其会将新链接放在db->contexts[i]数组的某个空位置,每次都会遍历寻找一个空的槽位放新链接。这里有个小优化其实就是用hints的机制,记录上次的查找位置,避免屡次重复的从前面找到后面。
链接读写事件处理完成后,mosquitto会检测是否须要从新reload部分配置文件。这个由SIGHUP的信号触发。
限于篇幅,具体的逻辑请求处理下次再介绍了。
mosquitto是一个简单可依赖的开源MQTT实现,能支持10W左右的同时在线(未亲测),单机版本,但经过bridge桥接模式支持部分分布式,但有限;协议自己很是适合在移动设备上使用,耗电少,处理快,属于header上带有消息体长度的协议,这个在异步网络事件代码编写时是码农最爱的,哈哈。
对于后续的提升优化的地方,简单记录几点:
参考: