在ESP8266学习系列中,博主一直使用HTTP协议。HTTP链接属于短链接,而在物联网应用中,普遍应用的倒是MQTT协议。因此,本篇咱们将学习Arduino平台上的MQTT实现库 —— PubSubClient。git
MQTT协议(Message Queuing Telemetry Transport),翻译过来就是遥信消息队列传输,是IBM公司于1999年提出的,如今最新版本是3.1.1。MQTT是一个基于TCP的发布订阅协议,设计的初始目的是为了极有限的内存设备和网络带宽很低的网络不可靠的通讯,很是适合物联网通讯。数组
MQTT属于应用层协议,基于TCP协议,确保了可靠性。博主在这里不会去详细讲述MQTT协议(网上讲解MQTT协议内容不少,不须要重复),但愿有兴趣的读者自行去阅读,可参考 MQTT中文文档。服务器
MQTT通讯模型以下:网络
MQTT消息支持三种QOS等级:数据结构
MQTT控制报文由三部分组成:less
总体上说,MQTT总体控制报文协议就是:dom
固定报头(必定有) + 可变报头(部分有) + 有效载荷(部分有)ide
数据结构简单,传输数据量小,这也是为何能应用于物联网应用的缘由之一。函数
注意点:工具
报文格式:
固定报头 + 可变报头 + 有效载荷
注意点:
报文格式:
固定报头 + 可变报头
注意点:
报文格式:
固定报头 + 可变报头 + 有效载荷
注意点:
报文格式:
固定报头 + 可变报头
注意点:
报文格式:
固定报头 + 可变报头
注意点:
报文格式:
固定报头 + 可变报头
注意点:
报文格式:
固定报头 + 可变报头
注意点:
报文格式:
固定报头 + 可变报头 + 有效载荷
注意点:
报文格式:
固定报头 + 可变报头 + 有效载荷
注意点:
报文格式:
固定报头 + 可变报头 + 有效载荷
注意点:
报文格式:
固定报头 + 可变报头
注意点:
报文格式:
固定报头
注意点:
报文格式:
固定报头
注意点:
报文格式:
固定报头
老规矩,上一个百度脑图:
函数1说明:
/** * 建立一个没有初始化的PubSubClient对象 */ PubSubClient::PubSubClient() { this->_state = MQTT_DISCONNECTED; this->_client = NULL; this->stream = NULL; setCallback(NULL); }
在使用PubSubClient对象以前,必须配置完整的内容:
WiFiClient espClient; PubSubClient client; void setup() { client.setClient(espClient); client.setServer("broker.example.com",1883); // client is now configured for use }
函数2说明:
/** * 建立一个部分初始化的PubSubClient对象 * @param client client实例 */ PubSubClient::PubSubClient(Client& client) { this->_state = MQTT_DISCONNECTED; setClient(client); this->stream = NULL; }
在使用PubSubClient对象以前,必须配置完整的内容:
WiFiClient espClient; PubSubClient client(espClient); void setup() { client.setServer("broker.example.com",1883); // client is now configured for use }
函数3说明:
/** * 建立完整初始化的PubSubClient对象 * @param addr mqtt服务器ip地址 * @param post mqtt服务器端口 * @param client 客户端实例 */ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(addr, port); setClient(client); this->stream = NULL; } /** * 建立完整初始化的PubSubClient对象 * @param addr mqtt服务器ip地址 * @param post mqtt服务器端口 * @param client 客户端实例 * @param stream 输出流,会把收到的消息输出到流中 */ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(addr,port); setClient(client); setStream(stream); } /** * 建立完整初始化的PubSubClient对象 * @param addr mqtt服务器ip地址 * @param post mqtt服务器端口 * @param MQTT_CALLBACK_SIGNATURE callback方法 * @param client 客户端实例 */ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(addr, port); setCallback(callback); setClient(client); this->stream = NULL; } /** * 建立完整初始化的PubSubClient对象 * @param addr mqtt服务器ip地址 * @param post mqtt服务器端口 * @param MQTT_CALLBACK_SIGNATURE callback方法 * @param client 客户端实例 * @param stream 输出流,会把收到的消息输出到流中 */ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(addr,port); setCallback(callback); setClient(client); setStream(stream); } /** * 建立完整初始化的PubSubClient对象 * @param ip mqtt服务器ip地址 * @param post mqtt服务器端口 * @param client 客户端实例 */ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(ip, port); setClient(client); this->stream = NULL; } /** * 建立完整初始化的PubSubClient对象 * @param ip mqtt服务器ip地址 * @param post mqtt服务器端口 * @param client 客户端实例 * @param stream 输出流,会把收到的消息输出到流中 */ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(ip,port); setClient(client); setStream(stream); } /** * 建立完整初始化的PubSubClient对象 * @param ip mqtt服务器ip地址 * @param post mqtt服务器端口 * @param client 客户端实例 * @param MQTT_CALLBACK_SIGNATURE callback方法 */ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(ip, port); setCallback(callback); setClient(client); this->stream = NULL; } /** * 建立完整初始化的PubSubClient对象 * @param ip mqtt服务器ip地址 * @param post mqtt服务器端口 * @param client 客户端实例 * @param MQTT_CALLBACK_SIGNATURE callback方法 * @param stream 输出流,会把收到的消息输出到流中 */ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(ip,port); setCallback(callback); setClient(client); setStream(stream); } /** * 建立完整初始化的PubSubClient对象 * @param domain mqtt服务器域名 * @param post mqtt服务器端口 * @param client 客户端实例 */ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setClient(client); this->stream = NULL; } /** * 建立完整初始化的PubSubClient对象 * @param domain mqtt服务器域名 * @param post mqtt服务器端口 * @param client 客户端实例 * @param stream 输出流,会把收到的消息输出到流中 */ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setClient(client); setStream(stream); } /** * 建立完整初始化的PubSubClient对象 * @param domain mqtt服务器域名 * @param post mqtt服务器端口 * @param client 客户端实例 * @param MQTT_CALLBACK_SIGNATURE callback方法 */ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setCallback(callback); setClient(client); this->stream = NULL; } /** * 建立完整初始化的PubSubClient对象 * @param domain mqtt服务器域名 * @param post mqtt服务器端口 * @param client 客户端实例 * @param MQTT_CALLBACK_SIGNATURE callback方法 * @param stream 输出流,会把收到的消息输出到流中 */ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setCallback(callback); setClient(client); setStream(stream); }
函数说明:
/** * 配置服务器 * @param ip MQTT服务器ip地址,数组 * @param port MQTT服务器端口 */ PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) { IPAddress addr(ip[0],ip[1],ip[2],ip[3]); return setServer(addr,port); } /** * 配置服务器 * @param ip MQTT服务器ip地址,IPAddress * @param port MQTT服务器端口 */ PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) { this->ip = ip; this->port = port; this->domain = NULL; return *this; } /** * 配置服务器 * @param domain MQTT服务器domain地址 * @param port MQTT服务器端口 */ PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) { this->domain = domain; this->port = port; return *this; }
注意:
函数说明:
/** * 配置客户端 * @param client client实例,好比wificlient */ PubSubClient& PubSubClient::setClient(Client& client){ this->_client = &client; return *this; }
注意:
函数说明
/** * 配置流,可用于存储消息内容 */ PubSubClient& PubSubClient::setStream(Stream& stream){ this->stream = &stream; return *this; }
注意:
函数说明
/** * 判断client是否链接上服务器 * @return bool true表示链接上 */ boolean PubSubClient::connected() { boolean rc; if (_client == NULL ) { rc = false; } else { rc = (int)_client->connected(); if (!rc) { //判断链接状态 if (this->_state == MQTT_CONNECTED) { this->_state = MQTT_CONNECTION_LOST; _client->flush(); _client->stop(); } } } return rc; }
函数说明:
/** * 链接MQTT服务(CONNECT控制报文) * @param id client端标识符 * @return bool 是否链接成功 */ boolean PubSubClient::connect(const char *id) { return connect(id,NULL,NULL,0,0,0,0,1); } /** * 链接MQTT服务(CONNECT控制报文) * @param id client端标识符 * @param user 用户帐号 * @param pass 用户密码 * @return bool 是否链接成功 */ boolean PubSubClient::connect(const char *id, const char *user, const char *pass) { return connect(id,user,pass,0,0,0,0,1); } /** * 链接MQTT服务(CONNECT控制报文) * @param id client端标识符 * @param willTopic 遗嘱主题 * @param willQos 遗嘱消息质量等级 * @param willRetain 是否保留信息 * @param willMessage 遗嘱内容 * @return bool 是否链接成功 */ boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1); } /** * 链接MQTT服务(CONNECT控制报文) * @param id client端标识符 * @param user 用户帐号 * @param pass 用户密码 * @param willTopic 遗嘱主题 * @param willQos 遗嘱消息质量等级 * @param willRetain 是否保留信息 * @param willMessage 遗嘱内容 * @return bool 是否链接成功 */ boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1); } /** * 链接MQTT服务(CONNECT控制报文) * @param id client端标识符 * @param user 用户帐号 * @param pass 用户密码 * @param willTopic 遗嘱主题 * @param willQos 遗嘱消息质量等级 * @param willRetain 是否保留信息 * @param willMessage 遗嘱内容 * @param cleanSession 是否清除会话 * @return bool 是否链接成功 * * @Note 注意结合CONNECT和CONNACK报文协议 */ boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession) { if (!connected()) { int result = 0; if (domain != NULL) { result = _client->connect(this->domain, this->port); } else { result = _client->connect(this->ip, this->port); } if (result == 1) { nextMsgId = 1; // Leave room in the buffer for header and variable length field //固定报头 uint16_t length = MQTT_MAX_HEADER_SIZE; unsigned int j; //在固定CONNECT报文可变报头包含四个字段,协议名、协议级别、链接标志、保持链接: #if MQTT_VERSION == MQTT_VERSION_3_1 uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; #define MQTT_HEADER_VERSION_LENGTH 9 #elif MQTT_VERSION == MQTT_VERSION_3_1_1 uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION}; #define MQTT_HEADER_VERSION_LENGTH 7 #endif for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) { buffer[length++] = d[j]; } /******************** 链接标志(Connect Flags) start *************************/ uint8_t v; //遗嘱主题 if (willTopic) { v = 0x04|(willQos<<3)|(willRetain<<5); } else { v = 0x00; } //清除会话 if (cleanSession) { v = v|0x02; } //帐号密码 if(user != NULL) { v = v|0x80; if(pass != NULL) { v = v|(0x80>>1); } } buffer[length++] = v; /***********************链接标志(Connect Flags) end ******************************************************/ /******************** 保持链接(Keep Alive) start *************************/ buffer[length++] = ((MQTT_KEEPALIVE) >> 8); buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); /******************** 保持链接(Keep Alive) end *************************/ //有效载荷 CONNECT报文的有效载荷包含一个或多个以长度为前缀的字段,可变报头中的标志决定是否包含这些字段。 //若是包含的话,必须按照这个顺序出现:client标识符,遗嘱主题,遗嘱消息,用户名,密码。 /*********** client标识符 start ************/ CHECK_STRING_LENGTH(length,id) length = writeString(id,buffer,length); /*********** client标识符 end ************/ /********* 遗嘱主题 遗嘱消息 start ********/ if (willTopic) { CHECK_STRING_LENGTH(length,willTopic) length = writeString(willTopic,buffer,length); CHECK_STRING_LENGTH(length,willMessage) length = writeString(willMessage,buffer,length); } /********* 遗嘱主题 遗嘱消息 end ********/ /********* 用户名 密码 start ********/ if(user != NULL) { CHECK_STRING_LENGTH(length,user) length = writeString(user,buffer,length); if(pass != NULL) { CHECK_STRING_LENGTH(length,pass) length = writeString(pass,buffer,length); } } /********* 用户名 密码 end ********/ //拼装 CONNECT消息 write(MQTTCONNECT,buffer,length-MQTT_MAX_HEADER_SIZE); lastInActivity = lastOutActivity = millis(); //等待MQTT服务器返回响应内容 while (!_client->available()) { unsigned long t = millis(); //判断是否超时 if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) { _state = MQTT_CONNECTION_TIMEOUT; _client->stop(); return false; } } uint8_t llen; //读取响应返回的内容 uint16_t len = readPacket(&llen); /*** 处理 CONNACK – 确认链接请求 报文 ***/ if (len == 4) { if (buffer[3] == 0) { lastInActivity = millis(); pingOutstanding = false; _state = MQTT_CONNECTED; return true; } else { _state = buffer[3]; } } _client->stop(); } else { _state = MQTT_CONNECT_FAILED; } return false; } return true; }
注意:
函数说明:
/** * 断开链接 * 客户端断开链接(客户端发给服务端的最后一个控制报文。表示客户端正常断开链接) * @Note 取消订阅报文格式: 固定报头(报文类型+剩余长度) */ void PubSubClient::disconnect() { /*** 断开链接 报文协议 ***/ buffer[0] = MQTTDISCONNECT; buffer[1] = 0; _client->write(buffer,2); _state = MQTT_DISCONNECTED; _client->flush(); _client->stop(); lastInActivity = lastOutActivity = millis(); }
函数说明:
/** * 订阅主题 * @param topic 主题 * @return bool 是否订阅成功 */ boolean PubSubClient::subscribe(const char* topic) { return subscribe(topic, 0); } /** * 订阅主题(客户端向服务端发送SUBSCRIBE报文用于建立一个或多个订阅) * @param topic 主题 * @param qos 质量等级 * @return bool 是否订阅成功 * * @Note 订阅报文格式: 固定报头 + 可变报头(报文标识符)+ 有效载荷(主题过滤器) * 订阅确认报文格式:固定报头 可变报头(报文标识符)+ 有效载荷(返回码清单) */ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { if (qos > 1) { return false; } if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) { // Too long return false; } if (connected()) { // Leave room in the buffer for header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; } buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId & 0xFF); length = writeString((char*)topic, buffer,length); buffer[length++] = qos; return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE); } return false; }
函数说明:
/** * 取消订阅主题(客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题) * @param topic 具体主题 * @return bool 是否取消成功 * @Note 取消订阅报文格式: 固定报头(报文类型+剩余长度) + 可变报头(报文标识符)+ 有效载荷(主题过滤器列表) */ boolean PubSubClient::unsubscribe(const char* topic) { if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) { // Too long return false; } if (connected()) { uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; } //可变报头(报文标识符) buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId & 0xFF); length = writeString(topic, buffer,length); return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE); } return false; }
函数说明:
/** * 发布对应主题消息 * @param topic 主题 * @param payload 有效负载 */ boolean PubSubClient::publish(const char* topic, const char* payload) { return publish(topic,(const uint8_t*)payload,strlen(payload),false); } /** * 发布对应主题消息 * @param topic 主题 * @param payload 有效负载 * @param retained 是否保持 */ boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) { return publish(topic,(const uint8_t*)payload,strlen(payload),retained); } /** * 发布对应主题消息 * @param topic 主题 * @param payload 有效负载 * @param plength 负载内容长度 */ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { return publish(topic, payload, plength, false); } /** * 发布对应主题消息 * @param topic 主题 * @param payload 有效负载 * @param plength 负载内容长度 * @param retained 是否保持 */ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { if (connected()) { if (MQTT_MAX_PACKET_SIZE < MQTT_MAX_HEADER_SIZE + 2+strlen(topic) + plength) { // Too long return false; } // Leave room in the buffer for header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; length = writeString(topic,buffer,length); uint16_t i; for (i=0;i<plength;i++) { buffer[length++] = payload[i]; } uint8_t header = MQTTPUBLISH; if (retained) { header |= 1; } return write(header,buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } /** * 发布对应主题消息 * @param topic 主题 * @param payload 有效负载(F(xxx)) * @param retained 是否保持 */ boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) { return publish_P(topic, (const uint8_t*)payload, strlen(payload), retained); } /** * 发布对应主题消息 * @param topic 主题 * @param payload 有效负载(F(xxx)) * @param plength 负载内容长度 * @param retained 是否保持 */ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { uint8_t llen = 0; uint8_t digit; unsigned int rc = 0; uint16_t tlen; unsigned int pos = 0; unsigned int i; uint8_t header; unsigned int len; if (!connected()) { return false; } tlen = strlen(topic); header = MQTTPUBLISH; if (retained) { header |= 1; } buffer[pos++] = header; len = plength + 2 + tlen; do { digit = len % 128; len = len / 128; if (len > 0) { digit |= 0x80; } buffer[pos++] = digit; llen++; } while(len>0); pos = writeString(topic,buffer,pos); rc += _client->write(buffer,pos); for (i=0;i<plength;i++) { rc += _client->write((char)pgm_read_byte_near(payload + i)); } lastOutActivity = millis(); return rc == tlen + 4 + plength; }
函数说明:
/** * 设置消息回调函数 * @param MQTT_CALLBACK_SIGNATURE */ PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) { this->callback = callback; return *this; }
注意:
#if defined(ESP8266) || defined(ESP32) #include <functional> #define MQTT_CALLBACK_SIGNATURE std::function<void(char*, uint8_t*, unsigned int)> callback #else #define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, unsigned int) #endif
函数说明:
/** * 处理消息以及保持心跳 */ boolean PubSubClient::loop() { if (connected()) { unsigned long t = millis(); if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { if (pingOutstanding) { this->_state = MQTT_CONNECTION_TIMEOUT; _client->stop(); return false; } else { /*** PINGREQ——心跳请求 ****/ buffer[0] = MQTTPINGREQ; buffer[1] = 0; _client->write(buffer,2); lastOutActivity = t; lastInActivity = t; pingOutstanding = true; } } if (_client->available()) { uint8_t llen; uint16_t len = readPacket(&llen); uint16_t msgId = 0; uint8_t *payload; if (len > 0) { lastInActivity = t; uint8_t type = buffer[0]&0xF0; if (type == MQTTPUBLISH) { //服务端发布消息 if (callback) { uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */ memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ char *topic = (char*) buffer+llen+2; // msgId only present for QOS>0 if ((buffer[0]&0x06) == MQTTQOS1) { msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; payload = buffer+llen+3+tl+2; callback(topic,payload,len-llen-3-tl-2); //客户端发布应答 buffer[0] = MQTTPUBACK; buffer[1] = 2; buffer[2] = (msgId >> 8); buffer[3] = (msgId & 0xFF); _client->write(buffer,4); lastOutActivity = t; } else { payload = buffer+llen+3+tl; callback(topic,payload,len-llen-3-tl); } } } else if (type == MQTTPINGREQ) { /**** PINGRESP-心跳响应 ****/ buffer[0] = MQTTPINGRESP; buffer[1] = 0; _client->write(buffer,2); } else if (type == MQTTPINGRESP) { /**** PINGRESP-心跳响应 ****/ pingOutstanding = false; } } else if (!connected()) { // readPacket has closed the connection return false; } } return true; } return false; }
上面的发布消息方法都是小数据发布,何为小数据呢?直接看buffer大小吧:
// MQTT_MAX_PACKET_SIZE : Maximum packet size #ifndef MQTT_MAX_PACKET_SIZE #define MQTT_MAX_PACKET_SIZE 128 #endif
默认的buffer是128字节,固然博主不推荐你们发送大量数据。
若是要发布稍微大一点的数据,就得用到三部曲方法。
函数说明:
/** * 发布大数据(第1步) —— 只是固定报头 + 剩余长度 * @param topic 主题 * @param plength 负载内容长度 * @param retained 是否保持 * * @Note 这里仍是用到buffer[128] */ boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) { if (connected()) { // Send the header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; length = writeString(topic,buffer,length); uint8_t header = MQTTPUBLISH; if (retained) { header |= 1; } size_t hlen = buildHeader(header, buffer, plength+length-MQTT_MAX_HEADER_SIZE); uint16_t rc = _client->write(buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); lastOutActivity = millis(); return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen))); } return false; }
函数说明:
/** * 发布大数据(第2步) —— 有效负载 * @param buffer 内容 * @param size 负载内容长度 * * @Note 这里仍是用到buffer[128] */ size_t PubSubClient::write(const uint8_t *buffer, size_t size) { lastOutActivity = millis(); return _client->write(buffer,size); }
函数说明:
/** * 发布大数据(第3步) 感受没什么用的方法.... */ int PubSubClient::endPublish() { return 1; }
函数说明:
//状态定义 // Possible values for client.state() #define MQTT_CONNECTION_TIMEOUT -4 #define MQTT_CONNECTION_LOST -3 #define MQTT_CONNECT_FAILED -2 #define MQTT_DISCONNECTED -1 #define MQTT_CONNECTED 0 #define MQTT_CONNECT_BAD_PROTOCOL 1 #define MQTT_CONNECT_BAD_CLIENT_ID 2 #define MQTT_CONNECT_UNAVAILABLE 3 #define MQTT_CONNECT_BAD_CREDENTIALS 4 #define MQTT_CONNECT_UNAUTHORIZED 5 /** * 获取Mqtt客户端当前状态 */ int PubSubClient::state() { return this->_state; }
工具讲完了,咱们直接看库自带的示例(后面博主会结合OneNet来使用MQTT 敬请期待)。
案例说明:
案例代码:
#include <ESP8266WiFi.h> #include <PubSubClient.h> // Update these with values suitable for your network. const char* ssid = "........";//wifi帐号 const char* password = "........";//wifi秘密 const char* mqtt_server = "broker.mqtt-dashboard.com";//mqtt服务器 WiFiClient espClient; PubSubClient client(espClient); long lastMsg = 0; char msg[50]; int value = 0; void setup_wifi() { delay(10); // We start by connecting to a WiFi network Serial.println(); Serial.print("Connecting to "); Serial.println(ssid); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); } randomSeed(micros()); Serial.println(""); Serial.println("WiFi connected"); Serial.println("IP address: "); Serial.println(WiFi.localIP()); } /** * 消息回调 */ void callback(char* topic, byte* payload, unsigned int length) { Serial.print("Message arrived ["); Serial.print(topic); Serial.print("] "); for (int i = 0; i < length; i++) { Serial.print((char)payload[i]); } Serial.println(); // Switch on the LED if an 1 was received as first character if ((char)payload[0] == '1') { digitalWrite(BUILTIN_LED, LOW); // Turn the LED on (Note that LOW is the voltage level // but actually the LED is on; this is because // it is active low on the ESP-01) } else { digitalWrite(BUILTIN_LED, HIGH); // Turn the LED off by making the voltage HIGH } } /** * 断开重连 */ void reconnect() { // Loop until we're reconnected while (!client.connected()) { Serial.print("Attempting MQTT connection..."); // Create a random client ID String clientId = "ESP8266Client-"; clientId += String(random(0xffff), HEX); // Attempt to connect if (client.connect(clientId.c_str())) { Serial.println("connected"); // Once connected, publish an announcement... client.publish("outTopic", "hello world"); // ... and resubscribe client.subscribe("inTopic"); } else { Serial.print("failed, rc="); Serial.print(client.state()); Serial.println(" try again in 5 seconds"); // Wait 5 seconds before retrying delay(5000); } } } void setup() { pinMode(BUILTIN_LED, OUTPUT); // Initialize the BUILTIN_LED pin as an output Serial.begin(115200); setup_wifi(); //配置mqtt服务器地址和端口 client.setServer(mqtt_server, 1883); //设置订阅消息回调 client.setCallback(callback); } void loop() { //重连机制 if (!client.connected()) { reconnect(); } //不断监听信息 client.loop(); long now = millis(); if (now - lastMsg > 2000) { //每2s发布一次信息 lastMsg = now; ++value; snprintf (msg, 50, "hello world #%ld", value); Serial.print("Publish message: "); Serial.println(msg); client.publish("outTopic", msg); } }
案例说明:
案例代码:
#include <ESP8266WiFi.h> #include <PubSubClient.h> const char* ssid = "........"; const char* password = "........"; const char* mqtt_server = "broker.mqtt-dashboard.com"; WiFiClient espClient; void callback(char* topic, byte* payload, unsigned int length) { // handle message arrived } PubSubClient client(mqtt_server, 1883, callback, espClient); void setup_wifi() { delay(10); // We start by connecting to a WiFi network Serial.println(); Serial.print("Connecting to "); Serial.println(ssid); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); } Serial.println(""); Serial.println("WiFi connected"); Serial.println("IP address: "); Serial.println(WiFi.localIP()); } void setup() { setup_wifi(); // Note - the default maximum packet size is 128 bytes. If the // combined length of clientId, username and password exceed this, // you will need to increase the value of MQTT_MAX_PACKET_SIZE in // PubSubClient.h if (client.connect("arduinoClient", "testuser", "testpass")) { client.publish("outTopic","hello world"); client.subscribe("inTopic"); } } void loop() { client.loop(); }
案例说明:
案例代码:
#include <ESP8266WiFi.h> #include <PubSubClient.h> // Update these with values suitable for your network. const char* ssid = "........"; const char* password = "........"; const char* mqtt_server = "broker.mqtt-dashboard.com"; WiFiClient espClient; PubSubClient client(espClient); void setup_wifi() { delay(10); // We start by connecting to a WiFi network Serial.println(); Serial.print("Connecting to "); Serial.println(ssid); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); } randomSeed(micros()); Serial.println(""); Serial.println("WiFi connected"); Serial.println("IP address: "); Serial.println(WiFi.localIP()); } void callback(char* topic, byte* payload, unsigned int length) { Serial.print("Message arrived ["); Serial.print(topic); Serial.print("] "); for (int i = 0; i < length; i++) { Serial.print((char)payload[i]); } Serial.println(); // Find out how many bottles we should generate lyrics for String topicStr(topic); int bottleCount = 0; // assume no bottles unless we correctly parse a value from the topic if (topicStr.indexOf('/') >= 0) { // The topic includes a '/', we'll try to read the number of bottles from just after that topicStr.remove(0, topicStr.indexOf('/')+1); // Now see if there's a number of bottles after the '/' bottleCount = topicStr.toInt(); } if (bottleCount > 0) { // Work out how big our resulting message will be int msgLen = 0; for (int i = bottleCount; i > 0; i--) { String numBottles(i); msgLen += 2*numBottles.length(); if (i == 1) { msgLen += 2*String(" green bottle, standing on the wall\n").length(); } else { msgLen += 2*String(" green bottles, standing on the wall\n").length(); } msgLen += String("And if one green bottle should accidentally fall\nThere'll be ").length(); switch (i) { case 1: msgLen += String("no green bottles, standing on the wall\n\n").length(); break; case 2: msgLen += String("1 green bottle, standing on the wall\n\n").length(); break; default: numBottles = i-1; msgLen += numBottles.length(); msgLen += String(" green bottles, standing on the wall\n\n").length(); break; }; } // 显示开始发布大数据 client.beginPublish("greenBottles/lyrics", msgLen, false); for (int i = bottleCount; i > 0; i--) { for (int j = 0; j < 2; j++) { client.print(i); if (i == 1) { client.print(" green bottle, standing on the wall\n"); } else { client.print(" green bottles, standing on the wall\n"); } } client.print("And if one green bottle should accidentally fall\nThere'll be "); switch (i) { case 1: client.print("no green bottles, standing on the wall\n\n"); break; case 2: client.print("1 green bottle, standing on the wall\n\n"); break; default: client.print(i-1); client.print(" green bottles, standing on the wall\n\n"); break; }; } // 发布完毕 client.endPublish(); } } /** * 重连机制 */ void reconnect() { // Loop until we're reconnected while (!client.connected()) { Serial.print("Attempting MQTT connection..."); // Create a random client ID String clientId = "ESP8266Client-"; clientId += String(random(0xffff), HEX); // Attempt to connect if (client.connect(clientId.c_str())) { Serial.println("connected"); // Once connected, publish an announcement... client.publish("outTopic", "hello world"); // ... and resubscribe client.subscribe("greenBottles/#"); } else { Serial.print("failed, rc="); Serial.print(client.state()); Serial.println(" try again in 5 seconds"); // Wait 5 seconds before retrying delay(5000); } } } void setup() { pinMode(BUILTIN_LED, OUTPUT); // Initialize the BUILTIN_LED pin as an output Serial.begin(115200); setup_wifi(); client.setServer(mqtt_server, 1883); client.setCallback(callback); } void loop() { if (!client.connected()) { reconnect(); } client.loop(); }
整体来讲,MQTT协议简单,很是容易上手使用,但愿读者也能就着协议理解一下源码知识,敬请期待OneNet篇关于MQTT的使用。