mosquitto服务器源码分析并修改将发布的数据保存到数据库

mosquitto服务器不保存数据,只是转发,若是想要保存数据只能对源码进行修改,想实现mosquitto服务器(broker)接收到发布端(pub)的数据后将收到数据保存到数据库,等到有订阅端订阅主题就能够调数据库里相对应的主题(topic)的数据发送给客户端(sub)。html

1、Mosquitto源码中的结构体

几个常看到的结构体便于源码理解:sql

struct mosquitto用来保存一个客户端链接的全部信息,如用户名、密码、用户ID、向该客户端发送的消息等数据库

struct mosquitto{数组

    mosq_sock_t sock;                                                //链接套接字安全

    enum mosquitto__protocol protocol;                      //客户端使用的协议版本号服务器

    char *address;                                                        //客户端ip数据结构

    char *id;                                                                  //客户端id异步

    char *username;                                                     //客户端的用户名socket

    char *password;                                            //客户端密码(安全认证时使用)函数

    uint16_t keepalive;                                      //保活时间

    uint16_t last_mid;

    enum mosquitto_client_state state;                  //客户端的状态

    time_t last_msg_in;                                    //收到的上一条消息的时间

    time_t next_msg_out;                                  //下一条待发送的消息的时间

    time_t ping_t;                                          //发送ping request的时间间隔

    struct mosquitto__packet  in_packet;                      //收到的报文

    struct mosquitto__packet *out_packet;                    //发送报文链表

    struct mosquitto_message *will;                              //遗嘱消息链表

    struct mosquitto_message_all *in_messages;                //收到的消息链表

    struct mosquitto_message_all *out_messages;              //发送的消息链表

    .....

}

struct mosquitto_message{

int mid;

char *topic;

void *payload;

int payloadlen;

int qos;

bool retain;

};

 

struct _mosquitto_subhier在mosquitto_broker.h中定义,用于保存订阅树的节点(包括叶子节点和中间节点),mosquitto对订阅树采用孩子 - 兄弟链表法的方式进行存储,该存储方式主要借助于数据结构struct _mosquitto_subhier来完成。

 struct _mosquitto_subhier {

    struct _mosquitto_subhier * children; //第一个孩子节点

    struct _mosquitto_subhier * next; //下一个兄弟节点

    struct _mosquitto_subleaf * subs; //订阅列表

    char * topic; //该节点对应的主题片断

    struct mosquitto_msg_store * retain; //该主题下被保留标记的消息

};

 

struct mosquitto_msg_store{

struct mosquitto_msg_store *next;

struct mosquitto_msg_store *prev;

dbid_t db_id;

char *source_id;

char **dest_ids;

int dest_id_count;

int ref_count;

char* topic;

mosquitto__payload_uhpa payload;

uint32_t payloadlen;

uint16_t source_mid;

uint16_t mid;

uint8_t qos;

bool retain;

};

参考:http://www.360doc.com/content/19/0324/20/62984756_823882579.shtml

2、mosquitto源码分析

参考:https://blog.csdn.net/weixin_38498942/article/details/88680291

服务端的实现逻辑主要是在/lib和/src目录下,main函数所在文件是:/mosquitto-1.5.5/src/mosquitto.c,其大体流程是:

 1. 调用net__broker_init函数,建立套接字。

 2. 调用config__init函数,初始化服务端配置相关项。配置相关的结构体:struct mosquitto__config。

 3. 调用config__parse_args,根据配置文件和命令行参数初始化服务端配置。

 4. 调用db__open函数,主要是建立了订阅树。该变量是服务端最重要的结构体,维护了订阅树根节点,订阅客户端的索引,消息链表等。

 5. 调用mosquitto_security_module_init函数,给db中安全认证相关的成员变量初始化。

 6. 调用mosquitto_security_init,主要是函数指针调函数(上一步已经给函数指针赋值),实现安全认证相关的操做。

 7. 根据config.listener_count数目,循环调用net__socket_listen函数,在该函数中建立监听套接字,设置套接字属性,并开始监听。

 8. 接下来将全部config.listener[i]的套接字统一放到listensock数组中去管理;

 9. 调用drop_privileges函数,若是以root身份运行,此函数将尝试更改成非特权用户和组。

 10. 接下来是调用signal函数,为信号注册信号处理函数,实现消息的异步通知。

11.最后进入到mosquitto_main_loop函数,该函数的主要功能就是处理客户端的订阅以及发布等请求,客户端订阅的主题有发布调用db__message_write()发布消息。

12.db__message_write();根据qos质量不一样调用send_publish给已经订阅的客户端

13.send__publish();查看链接跟主题是否有效接着调用send__real_publish()

14.send__real_publish();调用packet__write_byte() 大概是装包,返回packet__queue();

15.packet__queue();也仍是对数据的封装,返回packet__write();

16.packet__write();调用net__write(),net__write()里若是是window调用write(),若是不是调用send(),最后发布出去,大体这样一个流程

 

 

可是db__message_write()是有订阅端订阅了对应的主题才会发布出去,若是没有订阅,单是发布,消息会被遗弃不会被发布出去。依旧在mosquitto_main_loop里loop_handle_reads_writes根据读写事件发送或接收数据包。

       1.loop_handle_reads_writes(),使用多路复用poll监听事件,根据读写事件发送或接收数据包,当有事件可读调用packet__read()。

       2.packet__read()根据mosq->in_packet.command读取有关packet的内容,而后调用handle__packet();

       3.handle__packet()根据mosq->in_packet.command返回调用handle__publish()函数, case PUBLISH:     return handle__publish(mosq);

       4.handle__publish()须要发布的数据进入到handle__publish函数,函数的最后根据qos服务质量调用message__cleanup()清空掉,因此在handle__publish函数能够读取到发布端发布的消息即便没有被订阅。在这能够将发布的数据保存到数据库。

在handle__publish函数里面检查主题前将发布的主题、载荷打印出来。

运行mosquitto服务器,另外一个终端运行mosquitto_pub, 在connect mosqpub发布端链接进来和disconnect断开链接之间打印出消息

读出数据后就能够将数据保存到数据库

 

/mosquitto-1.5.5/src下面的Makefile文件在总目标前加上sqlite3库的位置以及sqlite3.h头文件的位置

LDFLAGS是告诉连接器从哪里寻找库文件,LDFLAGS:在里面指定库文件的位置,CFLAGS 表示用于 C 编译器的选项,CFLAGS: 指定头文件(.h文件)的路径

在Makefile文件总目标前

LDFLAGS+=-L/usr/local/lib -lsqlite3

BROKER_CFLAGS+=-I/usr/local/include

 

树莓派下用mosquitto库写的客户端,每隔30秒给服务器发布温度

服务器读到主题为温度,解析载荷并插入数据库

查看数据库

3、遇到的问题

主要仍是对gcc和Makefile的不了解,平时本身写着的程序gcc命令也就一点,本身写的makefile也很简陋,分析修改源码看到这么长的Makefile就傻眼了,仍是基础不牢,我会好好再了解一下gcc和Makefile。