https://www.cnblogs.com/homejim/p/8196763.htmljavascript
在文章Paho - MQTT C Cient的实现中,我介绍了如何使用Paho开源项目建立MQTTClient_pulish客户端。但只是简单的介绍了使用方法,并且客户端的结果与以前介绍的并不吻合,今天我就结合新的例子,给你们讲解一下Paho使用MQTT客户端的主要过程。
如同前面介绍的,MQTT客户端分为同步客户端和异步客户端。今天主要讲解的是同步客户端,结构仍是如同步客户端中介绍的:html
1.建立一个客户端对象;
2.设置链接MQTT服务器的选项;
3.若是多线程(异步模式)操做被使用则设置回调函数(详见 Asynchronous >vs synchronous client applications);
4.订阅客户端须要接收的任意话题;
5.重复如下操做直到结束:
a.发布客户端须要的任意信息;
b.处理全部接收到的信息;
6.断开客户端链接;
7.释放客户端使用的全部内存。java
好,直接上代码,MQTT简单的同步客户端。nginx
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include "MQTTClient.h" #if !defined(WIN32) #include <unistd.h> #else #include <windows.h> #endif #define NUM_THREADS 2 #define ADDRESS "tcp://localhost:1883" //更改此处地址 #define CLIENTID "aaabbbccc_pub" //更改此处客户端ID #define SUB_CLIENTID "aaabbbccc_sub" //更改此处客户端ID #define TOPIC "topic01" //更改发送的话题 #define PAYLOAD "Hello Man, Can you see me ?!" // #define QOS 1 #define TIMEOUT 10000L #define USERNAME "test_user" #define PASSWORD "jim777" #define DISCONNECT "out" int CONNECT = 1; volatile MQTTClient_deliveryToken deliveredtoken; void delivered(void *context, MQTTClient_deliveryToken dt) { printf("Message with token value %d delivery confirmed\n", dt); deliveredtoken = dt; } int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { int i; char* payloadptr; printf("Message arrived\n"); printf(" topic: %s\n", topicName); printf(" message: "); payloadptr = message->payload; if(strcmp(payloadptr, DISCONNECT) == 0){ printf(" \n out!!"); CONNECT = 0; } for(i=0; i<message->payloadlen; i++) { putchar(*payloadptr++); } printf("\n"); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } void connlost(void *context, char *cause) { printf("\nConnection lost\n"); printf(" cause: %s\n", cause); } void *subClient(void *threadid){ long tid; tid = (long)threadid; printf("Hello World! It's me, thread #%ld!\n", tid); MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; int rc; int ch; MQTTClient_create(&client, ADDRESS, SUB_CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.username = USERNAME; conn_opts.password = PASSWORD; MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS); MQTTClient_subscribe(client, TOPIC, QOS); do { ch = getchar(); } while(ch!='Q' && ch != 'q'); MQTTClient_unsubscribe(client, TOPIC); MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); pthread_exit(NULL); } void *pubClient(void *threadid){ long tid; tid = (long)threadid; int count = 0; printf("Hello World! It's me, thread #%ld!\n", tid); //声明一个MQTTClient MQTTClient client; //初始化MQTT Client选项 MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; //#define MQTTClient_message_initializer { {'M', 'Q', 'T', 'M'}, 0, 0, NULL, 0, 0, 0, 0 } MQTTClient_message pubmsg = MQTTClient_message_initializer; //声明消息token MQTTClient_deliveryToken token; int rc; //使用参数建立一个client,并将其赋值给以前声明的client MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.username = USERNAME; conn_opts.password = PASSWORD; //使用MQTTClient_connect将client链接到服务器,使用指定的链接选项。成功则返回MQTTCLIENT_SUCCESS if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } pubmsg.payload = PAYLOAD; pubmsg.payloadlen = strlen(PAYLOAD); pubmsg.qos = QOS; pubmsg.retained = 0; while(CONNECT){ MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); printf("Waiting for up to %d seconds for publication of %s\n" "on topic %s for client with ClientID: %s\n", (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID); rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); printf("Message with delivery token %d delivered\n", token); usleep(3000000L); } MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); } int main(int argc, char* argv[]) { pthread_t threads[NUM_THREADS]; long t; pthread_create(&threads[0], NULL, subClient, (void *)0); pthread_create(&threads[1], NULL, pubClient, (void *)1); pthread_exit(NULL); }
在代码中,我建立了两个线程,分别用来处理订阅客户端和发布客户端。windows
接下来我讲解一下这个简单的客户端,其中,大致的流程以下:
大致的流程如图所示,在客户端启动以后,会启动线程,建立一个订阅客户端,它会监听消息的到达,在消息到达以后会触发相应的回调函数以对消息进行处理;后在启动一个线程,建立一个发送客户端,用来发送消息的,每次发送消息以前会判断是否要掉线,如CONNECT=0则会掉线,不然发送消息给topic01。服务器
如下函数完成的是订阅的功能。session
void *subClient(void *threadid)
第一步:声明客户端,并经过函数给其赋值;多线程
MQTTClient client; MQTTClient_create(&client, ADDRESS, SUB_CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
第二步:设置链接MQTT服务器的选项;app
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
第三步:设置回调函数;less
MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); //相应的回调函数connlost,msgarrvd,delivered个人代码中都有
第四步:使用客户端和链接选项链接服务器;
MQTTClient_connect(client, &conn_opts))
第五步订阅话题;
MQTTClient_subscribe(client, TOPIC, QOS);
第六步一直等待,知道输入'Q' 或'q';
do { ch = getchar(); } while(ch!='Q' && ch != 'q');
第六步一直等待,直到输入'Q' 或'q';
do { ch = getchar(); } while(ch!='Q' && ch != 'q');
第七步取消订阅;
MQTTClient_unsubscribe(client, TOPIC);
第八步.断开客户端链接;
MQTTClient_disconnect(client, 10000);
第九步.释放客户端使用的全部内存;
MQTTClient_destroy(&client);
至此,订阅客户端就结束了。通常订阅客户端的大致结构都是这样。不一样的是回调函数的个性化上。
如下函数完成的是发送的功能。
void *pubClient(void *threadid)
第一步:声明客户端,并经过函数给其赋值;
MQTTClient client;
MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
第二步:设置链接MQTT服务器的选项;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
第三步:使用客户端和链接选项链接服务器;
MQTTClient_connect(client, &conn_opts)
第四步设置发送消息的属性;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD); pubmsg.qos = QOS; pubmsg.retained = 0;
第五步循环发送消息;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
第六步一直等待,当CONNECT=0时退出该客户端;
第七步.断开客户端链接;
MQTTClient_disconnect(client, 10000);
第八步.释放客户端使用的全部内存;
MQTTClient_destroy(&client);
至此,发送客户端就结束了。通常的发送客户端大致结构也如此,但异步客户端可能有些许不一样,无非就是设计回调函数,而后在链接,断开链接等时可使用回调函数作一些操做而已,具体的能够本身研究。
为了让你们可以更深刻了解,我把本身学到的一些函数和结构体大体在下面讲解了一下。
MQTTClient
定义:typedef void* MQTTClient;
含义:表明MQTT客户端的句柄。成功调用MQTTClient_create()后,能够获得有效的客户端句柄。
MQTTClient_connectOptions
定义:
typedef struct { char struct_id[4];//结构体的识别序列,必须为MQTC int struct_version;//结构体版本 /** 在0,1,2,3,4,5中取值: 0-表示没有SSL选项且没有serverURIs; 1-表示没有serverURIs; 2-表示没有MQTTVersion 3-表示没有返回值; 4-表示没有二进制密码选项 */ int keepAliveInterval; /** 在这段时间内没有数据相关的消息时,客户端发送一个很是小的MQTT“ping”消息,服务器将会确认这个消息 */ int cleansession; /** 当cleansession为true时,会话状态信息在链接和断开链接时被丢弃。 将cleansession设置为false将保留会话状态信息 */ int reliable; /* 将该值设置为true意味着必须完成发布的消息(已收到确认),才能发送另外一个消息 */ MQTTClient_willOptions* will; /* 若是程序不使用最后的意愿和遗嘱功能,请将此指针设置为NULL。 */ const char* username;//用户名 const char* password;//密码 int connectTimeout;//容许尝试链接的过期时间 int retryInterval;//尝试重连的时间 MQTTClient_SSLOptions* ssl; /* 若是程序不使用最后的ssl,请将此指针设置为NULL。 */ int serverURIcount; char* const* serverURIs; /* 链接服务器的url,以protocol:// host:port为格式 */ int MQTTVersion; /* MQTT的版本,MQTTVERSION_3_1(3),MQTTVERSION_3_1_1 (4) */ struct { const char* serverURI; int MQTTVersion; int sessionPresent; } returned; struct { int len; const void* data; } binarypwd; } MQTTClient_connectOptions;
含义:用来设置MQTTClient的链接选项的结构体。
MQTTClient_message
定义:
typedef struct { char struct_id[4];//结构体的识别序列,必须为MQTM int struct_version;//结构体的版本,必须为0 int payloadlen;//MQTT信息的长度 void* payload;//指向消息负载的指针 int qos;//服务质量 int retained;//保留标志 int dup;dup//标志指示这个消息是不是重复的。 只有在收到QoS1消息时才有意义。 若是为true,则客户端应用程序应采起适当的措施来处理重复的消息。 int msgid;//消息标识符一般保留供MQTT客户端和服务器内部使用。 } MQTTClient_message;
含义:表明MQTT信息的结构体。
MQTTClient_create
定义:
DLLExport int MQTTClient_create( MQTTClient * handle, const char * serverURI, const char * clientId, int persistence_type, void * persistence_context )
做用:该函数建立了一个用于链接到特定服务器,使用特定持久存储的MQTT客户端。
参数 含义 handle 指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充 serverURI 以空结尾的字符串,其指定客户端将链接到的服务器。其格式为protocol://host:port。如今的(protocol)协议必须是tcp或ssl,而host能够指定为IP地址或域名。例如, 要使用默认 MQTT 端口链接到本地计算机上运行的服务器, 请指定为 tcp://localhost:1883。 clientId 客户端标识符(clientId)是一个以空结尾的 UTF-8 编码字符串,客户端链接到服务器时将它传递过去。 persistence_type 客户端所使用的持久类型。MQTTCLIENT_PERSISTENCE_NONE-使用内存持久化。若是客户端运行的设备或系统出故障或关闭, 则任何正在运行的消息的当前状态都将丢失, 甚至在 QoS1 和 QoS2 中也可能没法传递某些消息; MQTTCLIENT_PERSISTENCE_DEFAULT-使用默认的持久化机制(文件系统)。正在运行消息的状态被保存在持久存储中,以便在乎外出现时对消息的丢失提供一些保护; MQTTCLIENT_PERSISTENCE_USER-使用程序指定的持久化实现。使用这种类型,应用程序可对持久化机制进行控制,应用程序必须实现MQTTClient_persistence 接口。 persistence_context 若是应用程序使用的是MQTTCLIENT_PERSISTENCE_NONE持久化,该参数不使用,并且值应该设置为NULL。对于MQTTCLIENT_PERSISTENCE_DEFAULT持久化,应该设置持久化目录的位置(若是设置为NULL,则使用工做目录做为持久化目录)。使用MQTTCLIENT_PERSISTENCE_USER持久化,则将此参数指向有效的MQTTClient_persistence结构。
MQTTClient_setCallbacks
定义:
DLLExport int MQTTClient_setCallbacks ( MQTTClient handle, void * context, MQTTClient_connectionLost * cl, MQTTClient_messageArrived * ma, MQTTClient_deliveryComplete * dc )
做用:该函数为特定的客户端建立回调函数。若是您的客户端应用程序不使用特定的回调函数,请将相关参数设置为NULL。 调用MQTTClient_setCallbacks()使客户端进入多线程模式。 任何须要的消息确认和状态通讯都在后台处理,而不须要客户端应用程序的任何干预。
注意:在调用该函数时,MQTT客户端必须断开链接。(即先要调用该函数在链接客户端)。
| 参数 | 含义 |
| ---|-------------|
| handle | 指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充 |
| context| 指向任何应用程序特定上下文的指针。 上下文指针被传递给每一个回调函数,以提供对回调中的上下文信息的访问。|
|cl|指向MQTTClient_connectionLost()回调函数的指针。 若是您的应用程序不处理断开链接,您能够将其设置为NULL。|
|ma|指向MQTTClient_messageArrived()回调函数的指针。 当您调用MQTTClient_setCallbacks()时,必须指定此回调函数。|
|dc|指向MQTTClient_deliveryComplete()回调函数的指针。 若是您的应用程序同步发布,或者您不想检查是否成功发送,则能够将其设置为NULL。|
MQTTClient_connect
定义:
DLLExport int MQTTClient_connect ( MQTTClient handle, MQTTClient_connectOptions * options )
做用:此函数尝试使用指定的选项将先前建立的客户端链接到MQTT服务器。
参数 含义 handle 指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充 options 指向有效的MQTTClient_connectOptions结构的指针。
> | 返回值 |
---|---|
0 | 链接成功 |
1 | 拒绝链接:不可接受的协议版本。 |
2 | 拒绝链接:标识符被拒绝。 |
3 | 拒绝链接:服务器不可用。 |
4 | 拒绝链接:用户名或密码错误。 |
5 | 拒绝链接:未经受权。 |
6 | 保留给将来用。 |
MQTTClient_subscribe
定义:
DLLExport int MQTTClient_subscribe ( MQTTClient handle, const char * topic, int qos )
做用:此功能尝试将客户订阅到单个主题,该主题可能包含通配符。 此函数还指定服务质量。
参数 含义 handle 指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充 topic 订阅的主题,可以使用通配符。 qos 订阅的请求服务质量
MQTTClient_publishMessage
定义:
DLLExport int MQTTClient_publishMessage ( MQTTClient handle, const char * topicName, MQTTClient_message * msg, MQTTClient_deliveryToken * dt )
做用:此功能尝试将客户订阅到单个主题,该主题可能包含通配符。 此函数还指定服务质量。
参数 含义 handle 指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充 topicName 与信息相关的主题。 msg 指向有效的 MQTTClient_message 结构的指针, 其中包含要发布消息的有效负载和属性 dt 指向MQTTClient_deliveryToken的指针。当函数成功返回时,dt会被赋值为表明消息的token。若是程序中没有使用传递token,将其设置为NULL。
MQTTClient_waitForCompletion
定义:
DLLExport int MQTTClient_waitForCompletion ( MQTTClient handle, MQTTClient_deliveryToken dt, unsigned long timeout )
做用:客户端应用程序调用此函数来将主线程的执行与消息的完成发布同步。 被调用时,MQTTClient_waitForCompletion()阻塞执行,直到消息成功传递或已超过指定的时间。
参数 含义 handle 指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充 dt 表明消息的MQTTClient_deliveryToken用来检测是否成功传递。传递token由发布函数MQTTClient_publish () 和 MQTTClient_publishMessage ()所产生。 timeout 等待的最大毫秒数。 返回值:
消息成功传递则返回MQTTCLIENT_SUCCESS(0) ,若是时间已过时或检测token时出问题,则返回错误码。
对paho客户端的讲解就到此结束了,若有不明白的,能够给我留言,一块儿讨论,一块儿进步。