mosquitto服务器不保存数据,只是转发,若是想要保存数据只能对源码进行修改,想实现mosquitto服务器(broker)接收到发布端(pub)的数据后将收到数据保存到数据库,等到有订阅端订阅主题就能够调数据库里相对应的主题(topic)的数据发送给客户端(sub)。html
几个常看到的结构体便于源码理解:sql
struct mosquitto用来保存一个客户端链接的全部信息,如用户名、密码、用户ID、向该客户端发送的消息等数据库
struct mosquitto{数组
mosq_sock_t sock; //链接套接字安全
enum mosquitto__protocol protocol; //客户端使用的协议版本号服务器
char *address; //客户端ip数据结构
char *id; //客户端id异步
char *username; //客户端的用户名socket
char *password; //客户端密码(安全认证时使用)函数
uint16_t keepalive; //保活时间
uint16_t last_mid;
enum mosquitto_client_state state; //客户端的状态
time_t last_msg_in; //收到的上一条消息的时间
time_t next_msg_out; //下一条待发送的消息的时间
time_t ping_t; //发送ping request的时间间隔
struct mosquitto__packet in_packet; //收到的报文
struct mosquitto__packet *out_packet; //发送报文链表
struct mosquitto_message *will; //遗嘱消息链表
struct mosquitto_message_all *in_messages; //收到的消息链表
struct mosquitto_message_all *out_messages; //发送的消息链表
.....
}
struct mosquitto_message{
int mid;
char *topic;
void *payload;
int payloadlen;
int qos;
bool retain;
};
struct _mosquitto_subhier在mosquitto_broker.h中定义,用于保存订阅树的节点(包括叶子节点和中间节点),mosquitto对订阅树采用孩子 - 兄弟链表法的方式进行存储,该存储方式主要借助于数据结构struct _mosquitto_subhier来完成。
struct _mosquitto_subhier {
struct _mosquitto_subhier * children; //第一个孩子节点
struct _mosquitto_subhier * next; //下一个兄弟节点
struct _mosquitto_subleaf * subs; //订阅列表
char * topic; //该节点对应的主题片断
struct mosquitto_msg_store * retain; //该主题下被保留标记的消息
};
struct mosquitto_msg_store{
struct mosquitto_msg_store *next;
struct mosquitto_msg_store *prev;
dbid_t db_id;
char *source_id;
char **dest_ids;
int dest_id_count;
int ref_count;
char* topic;
mosquitto__payload_uhpa payload;
uint32_t payloadlen;
uint16_t source_mid;
uint16_t mid;
uint8_t qos;
bool retain;
};
参考:http://www.360doc.com/content/19/0324/20/62984756_823882579.shtml
参考:https://blog.csdn.net/weixin_38498942/article/details/88680291
服务端的实现逻辑主要是在/lib和/src目录下,main函数所在文件是:/mosquitto-1.5.5/src/mosquitto.c,其大体流程是:
1. 调用net__broker_init函数,建立套接字。
2. 调用config__init函数,初始化服务端配置相关项。配置相关的结构体:struct mosquitto__config。
3. 调用config__parse_args,根据配置文件和命令行参数初始化服务端配置。
4. 调用db__open函数,主要是建立了订阅树。该变量是服务端最重要的结构体,维护了订阅树根节点,订阅客户端的索引,消息链表等。
5. 调用mosquitto_security_module_init函数,给db中安全认证相关的成员变量初始化。
6. 调用mosquitto_security_init,主要是函数指针调函数(上一步已经给函数指针赋值),实现安全认证相关的操做。
7. 根据config.listener_count数目,循环调用net__socket_listen函数,在该函数中建立监听套接字,设置套接字属性,并开始监听。
8. 接下来将全部config.listener[i]的套接字统一放到listensock数组中去管理;
9. 调用drop_privileges函数,若是以root身份运行,此函数将尝试更改成非特权用户和组。
10. 接下来是调用signal函数,为信号注册信号处理函数,实现消息的异步通知。
11.最后进入到mosquitto_main_loop函数,该函数的主要功能就是处理客户端的订阅以及发布等请求,客户端订阅的主题有发布调用db__message_write()发布消息。
12.db__message_write();根据qos质量不一样调用send_publish给已经订阅的客户端
13.send__publish();查看链接跟主题是否有效接着调用send__real_publish()
14.send__real_publish();调用packet__write_byte() 大概是装包,返回packet__queue();
15.packet__queue();也仍是对数据的封装,返回packet__write();
16.packet__write();调用net__write(),net__write()里若是是window调用write(),若是不是调用send(),最后发布出去,大体这样一个流程
可是db__message_write()是有订阅端订阅了对应的主题才会发布出去,若是没有订阅,单是发布,消息会被遗弃不会被发布出去。依旧在mosquitto_main_loop里loop_handle_reads_writes根据读写事件发送或接收数据包。
1.loop_handle_reads_writes(),使用多路复用poll监听事件,根据读写事件发送或接收数据包,当有事件可读调用packet__read()。
2.packet__read()根据mosq->in_packet.command读取有关packet的内容,而后调用handle__packet();
3.handle__packet()根据mosq->in_packet.command返回调用handle__publish()函数, case PUBLISH: return handle__publish(mosq);
4.handle__publish()须要发布的数据进入到handle__publish函数,函数的最后根据qos服务质量调用message__cleanup()清空掉,因此在handle__publish函数能够读取到发布端发布的消息即便没有被订阅。在这能够将发布的数据保存到数据库。
在handle__publish函数里面检查主题前将发布的主题、载荷打印出来。
运行mosquitto服务器,另外一个终端运行mosquitto_pub, 在connect mosqpub发布端链接进来和disconnect断开链接之间打印出消息
读出数据后就能够将数据保存到数据库
/mosquitto-1.5.5/src下面的Makefile文件在总目标前加上sqlite3库的位置以及sqlite3.h头文件的位置
LDFLAGS是告诉连接器从哪里寻找库文件,LDFLAGS:在里面指定库文件的位置,CFLAGS 表示用于 C 编译器的选项,CFLAGS: 指定头文件(.h文件)的路径
在Makefile文件总目标前
LDFLAGS+=-L/usr/local/lib -lsqlite3
BROKER_CFLAGS+=-I/usr/local/include
树莓派下用mosquitto库写的客户端,每隔30秒给服务器发布温度
服务器读到主题为温度,解析载荷并插入数据库
查看数据库
主要仍是对gcc和Makefile的不了解,平时本身写着的程序gcc命令也就一点,本身写的makefile也很简陋,分析修改源码看到这么长的Makefile就傻眼了,仍是基础不牢,我会好好再了解一下gcc和Makefile。