mosquitto在Linux环境下的部署 看了有三四天的的源码,(固然没怎么好好看了),忽然发现对mosquitto的源码有了一点点感受,因而在第五天决定在Linux环境下部署mosquitto。 使用传统源码安装步骤: 步骤1:http://mosquitto.org/files/source/官网下载源码,放到Linux环境中。解压后,找到主要配置文件config.mk,其中包含mosquitto的安装选项,须要注意的是,默认状况下mosquitto的安装须要OpenSSL(一个强大的安全套接字层密码库)的支持,若不须要SSL,则须要关闭config.mk里面与SSL功能有关的选项(WITH_TLS、WITH_TLS_PSK)。笔者这里直接将这两句话屏蔽掉了。 步骤2 :配置完毕后,输入“make install”进行安装(须要root权限),这里编译失败出现了一个问题: error while loading shared libraries:libmosquitto.so.1 : cannot open shared object file: No such file or directory 因此问题很清楚,没有找到这个动态连接库。遇到这种问题就有两种状况: 1).确实没有这个库或者库的版本不对 。 2)这个库所在的路径不在系统查找范围内。 笔者感受这个库名字很眼熟,果真在“make install”命令执行的打印信息中发现蛛丝马迹: “install -s --strip-program=strip libmosquitto.so.1 /usr/local/lib/libmosquitto.so.1” 笔者在这个路径下,找到了该动态库,说明如今的问题应该是属于第二种状况(并且是官方的代码,也不该该会犯第一种问题),因而在网上找到了解决方案。 1) 若是共享库文件安装到了/lib或/usr/lib目录下, 那么需执行一下ldconfig命令 ldconfig命令的用途, 主要是在默认搜寻目录(/lib和/usr/lib)以及动态库配置文件/etc/ld.so.conf内所列的目录下, 搜索出可共享的动态连接库(格式如lib*.so*), 进而建立出动态装入程序(ld.so)所需的链接和缓存文件. 缓存文件默认为/etc/ld.so.cache, 此文件保存已排好序的动态连接库名字列表. 2) 若是共享库文件安装到了/usr/local/lib(不少开源的共享库都会安装到该目录下)或其它"非/lib或/usr/lib"目录下, 那么在执行ldconfig命令前, 还要把新共享库目录加入到共享库配置文件/etc/ld.so.conf中, 以下:(须要root权限执行下面命令) # cat /etc/ld.so.conf include ld.so.conf.d/*.conf # echo "/usr/local/lib" >> /etc/ld.so.conf # ldconfig (详情参阅http://blog.chinaunix.net/uid-26212859-id-3256667.html) 这里笔者就是使用第二种状况的办法,成功完成编译。 完成后会在系统命令行里发现mosquitto、mosquitto_sub、mosquitto_pub三个工具(网上说有四个,还有一个mosquitto_passwd,用于管理密码,应该是关闭SSL的缘由),分别用于启动代理、订阅消息和发布消息。测试 步骤1:开启一个终端:输入“mosquitto”命令,结果以下图,服务启动,由于一直监听,因此不会看到命令行。 正常状况
输入“mosquitto”若是以下图报错,发现报错是地址已被使用,可使用 "ps -e "查看进程和“netstat -apn | grep :1883”来查看谁占用端口,可以使用“kill -s 9 pid号”杀死该进程,而后从新输入“mosquitto”命令便可获得上图正确结果。 报错
步骤2:开启第二个终端,输入“mosquitto_sub -t 主题名 -i 用户名”, (后面的“-i 用户名可省略”) 例如:mosquitto_sub -t mqtt 结果如图,因为一直监听,因此也不会看到命令行。 发布消息前第二个终端截图
步骤3:开启第三个终端,输入“mosquitto_pub -t 主题名 -i 用户名 -m 要发送的消息” (若是要发送的消息中有空格,需用引号括起来) 例如:mosquitto_pub -h localhost -t mqtt -m "hello world" 则第二个终端能够收到这条信息。笔者看到其命令行有文件传输,又尝试传一个文件(内容只有一句话),第二个终端会直接显示文件的内容(截图中“hello World”下面的那句话就是)。尝试一个大文件的传输,将一个7M的书传过去,首先是能够传,可是第二个窗口显示的全是乱码,传输的速度也是一个问题。
发布消息后的第二个终端截图 这里之因此会想到传文件是由于看到mosquitto_pub的命令参数中有关于把文件当作message传输的记录,如图:这里的文件上限默认是256M。逻辑中有对文件大小的判断,超过256M的文件则不传。不知道这里若是吧这个值修改更大,会不会产生影响,笔者没有尝试,由于传7M的文件都感受很慢。(这个问题在MQTT协议介绍中能够获得答案,MQTT文件长度的表示是用1至4个字节来表示,而其表示长度的方式又有特殊的加密方式,按照这种方式,其最大表示的长度为256M) 测试总结 三个终端,一个用来开启服务,一个执行mosquitto_sub来订阅消息,与服务器保持长链接,随时接收来自服务器推送的消息,最后一个终端则用来发布消息。这个测试的结果如今是正确的,但仍存在局限性,还有如下几个问题须要注意: 1)了解mosquitto_sub和mosquitto_pub命令背后是如何执行的,须要修改,订阅端的处理确定不能仅仅是显示内容 到标准输出上。 2)了解mosquitto命令的逻辑,这里包含的内容不少,估计也是最难的。 3)这里的实验是在本地传输,须要作一个客户端出来(客户端多是Android端或者MCU端),看是否能够正常传输,还有就是能传多大的数据,容许同时连入的客户数有多少(听说是20000以上)。
mosquito monitor idea check process mosquitto with pidfile /var/run/mosquitto.pid start = "/etc/init.d/mosquitto start" stop = "/etc/init.d/mosquitto stop" mosquito 是一个MQTT 服务器。目前只在公司移动互联网服务器上环境部署过,用的比较少,show 一下安装过程: wget http://mosquitto.org/files/source/mosquitto-1.0.3.tar.gz tar -zxvf mosquitto-1.0.3.tar.gz cd mosquitto-1.0.3 make WITH_TLS_PSK=no (遇到undefined reference to `SSL_CTX_set_psk_server_callback' ,添加参数WITH_TLS_PSK=no 禁用SSL PSK 支持) make install prefix=/home/mosquitto mkdir /home/mosquitto/etc mv /etc/mosquitto/* /home/mosquitto/etc/ strip /home/mosquitto/bin/* strip /homo/mosquitto/sbin/* strip /homo/mosquitto/lib/* echo "/home/mosquitto/lib/" >> /etc/ld.so.conf ldconfig -f /etc/ld.so.conf 修改 /homo/mosquitto/etc/mosquitto.conf 用户: user nobody (其它参数目前使用默认值) 启动服务(若有服务相关错误,检查/home/mosquitto/mosquitto.log,端口默认1883) /home/mosquitto/sbin/mosquitto -d -c /home/mosquitto/etc/mosquitto.conf > /home/mosquitto/mosquitto.log 2>&1 终端测试: 客户端 mosquitto_sub -h SERVERIP -t test 服务器端执行后 /home/mosquitto/bin/mosquitto_pub -t test -m "123456" 客户端会成功收到"123456" 查看Mqtt订阅者运行情况 mosquitto_sub -v -t \$SYS/# 或者细化为其中一个命令 mosquitto_sub -v -t ‘$SYS/broker/clients/active’ monitor statitics script 100万并发链接之笔记 epoll详解 select,poll和epoll区别 高并发的epoll多线程实例 mosquitto的pthread没法使用 C1000链接 mosquitto源代码分析 Java NIO 单线程服务器 lighthttp vs apache vs ngix Java heap space error summary mqtt benchmark can not create native thread mosquitto 1.2.1代码优化 mosquitto 100k测试场景 Mosquitto pub/sub代码分析 优化: 对于后续的提升优化的地方,简单记录几点: 发送数据用writev poll -> epoll ,用以支持更高的冰法; 改成单线程版本,下降锁开销,目前锁开销仍是很是大的。目测能够改成单进程版本,相似redis,精心维护的话应该能达到不错的效果; 网络数据读写使用一次尽可能多读的方式,避免屡次进入系统调用; 内存操做优化。不free,留着下次用; 考虑使用spwan-fcgi的形式或者内置一次启动多个实例监听同一个端口。这样能更好的发挥机器性能,达到更高的性能; (MaxProcessMemory - JVMMemory - ReservedOsMemory) / (ThreadStackSize) = Number of threads MaxProcessMemory 指的是一个进程的最大内存 JVMMemory JVM内存 ReservedOsMemory 保留的操做系统内存 ThreadStackSize 线程栈的大小 在java语言里, 当你建立一个线程的时候,虚拟机会在JVM内存建立一个Thread对象同时建立一个操做系统线程,而这个系统线程的内存用的不是JVMMemory,而是系统中剩下的内存(MaxProcessMemory - JVMMemory - ReservedOsMemory)。 一哥们发送线程数和xss以及max user process 有关 epoll + 多线程实现并发网络链接处理 Mosquitto持久层群推消息实现思路 JAVA使用EPoll来进行NIO处理的方法 JDK 6.0 以及JDK 5.0 update 9 的 nio支持epoll (仅限 Linux 系统 ),对并发idle connection会有大幅度的性能提高,这就是不少网络服务器应用程序须要的。 启用的方法以下: -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider 单进程FD上限是最大能够打开文件的数目, 这个数字通常远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目能够cat /proc/sys/fs/file-max mosquitto -c /etc/mosquitto/mosquitto.conf -d”便可开启服务 压力测试: 系统参数修改 ulimit -u 12000 (max processes) ulimit -n 12000 (max files) 测试最大链接数 #!/bin/bash c=1 while [ $c -le 18000 ] do mosquitto_sub -d -t hello/world -k 900 & (( c++ )) done # netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}' kill -9 `ps -ef|grep mosquitto|awk '{print $2}'` ps -ef|grep report |grep -v grep|awk '{print $2}' |xargs kill -9 pkill -9 mosquitto killall -9 mosquitto 查看cpu 占用高的线程 ps H -eo user,pid,ppid,tid,time,%cpu,cmd --sort=%cpu 查看线程的I/O信息 /proc/15938/task/15942/fd lsof -c mosquitto [root@wwwu fd]# pstack 15938 Thread 5 (Thread 0x7f280e377700 (LWP 15939)): #0 0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6 #1 0x000000000040c1d0 in mosquitto_main_loop_4_client () #2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0 #3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6 Thread 4 (Thread 0x7f280d976700 (LWP 15940)): #0 0x000000000040a464 in mqtt3_db_message_timeout_check_epoll () #1 0x000000000040bc3b in mosquitto_main_loop_4_client_all () #2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0 #3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6 Thread 3 (Thread 0x7f280cf75700 (LWP 15941)): #0 0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6 #1 0x000000000040c1d0 in mosquitto_main_loop_4_client () #2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0 #3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6 Thread 2 (Thread 0x7f280c574700 (LWP 15942)): #0 0x000000000040a464 in mqtt3_db_message_timeout_check_epoll () #1 0x000000000040bc3b in mosquitto_main_loop_4_client_all () #2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0 #3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6 Thread 1 (Thread 0x7f280e59c7c0 (LWP 15938)): #0 0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6 #1 0x000000000040c7c5 in mosquitto_main_loop_4_epoll () #2 0x000000000040414b in main () 实际测试 Test case 1 1 10934 max connections with QoS 0 [root@ ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}' CLOSE_WAIT 1 ESTABLISHED 10934 LISTEN 12 脚本输出 Client mosqsub/13526-wwwu.mqtt sending CONNECT Client mosqsub/13526-wwwu.mqtt received CONNACK Client mosqsub/13526-wwwu.mqtt sending SUBSCRIBE (Mid: 1, Topic: hello/world, QoS: 0) Client mosqsub/13526-wwwu.mqtt received SUBACK Subscribed (mid: 1): 0 Client mosqsub/13523-wwwu.mqtt sending CONNECT Client mosqsub/13523-wwwu.mqtt received CONNACK Client mosqsub/13523-wwwu.mqtt sending SUBSCRIBE (Mid: 1, Topic: hello/world, QoS: 0) Client mosqsub/13523-wwwu.mqtt received SUBACK Subscribed (mid: 1): 0 Test Case 2: 该测试只是验证能打开的最大链接数,意义不太大 max files: ulimit -n 15000 [root@wwwu test]# netstat -na|grep ESTAB|grep 1883|wc -l 15088 cpu 2 core 只使用100%(total 200%),目前没有publish数据,因此memory使用量不多3%, 还能够继续提高 vmstat [root@wwwu test]# vmstat 3 procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu----- r b swpd free buff cache si so bi bo in cs us sy id wa st 1 0 24080 1865596 233880 266228 1 3 2 4 2 11 0 0 99 0 0 1 0 24080 1865588 233880 266228 0 0 0 0 1030 30 17 33 50 0 0 [root@wwwu test]# top top - 03:17:46 up 13 days, 10 min, 4 users, load average: 1.09, 0.83, 0.37 Tasks: 164 total, 2 running, 162 sleeping, 0 stopped, 0 zombie Cpu0 : 31.2%us, 68.8%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Cpu1 : 0.0%us, 0.3%sy, 0.0%ni, 99.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Mem: 2957536k total, 1091948k used, 1865588k free, 233880k buffers Swap: 3096568k total, 24080k used, 3072488k free, 266228k cached PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 6709 root 20 0 150m 113m 784 R 100.0 3.9 15:16.63 mosquitto 发现问题 no route to host; telnet: Unable to connect to remote host: No route to host 以为甚是差别,估计是虚拟机装了有问题,就把虚拟机中的防火墙给清了一下,发现可行。 [zhoulei@localhost ~]$ sudo iptables -F mosquitto没法启动 发现netstat -an|grep 1883 没有发现记录,发现mosquitto.db文件很大445M,把该文件删除后, 能够正常启动 2014/05/07 结果 ------------------------------------------------------------------------------------------------ 【subscriber】 开6000个subscribe active 链接,脚本以下 java -Xms150M -Xmx350M -Djava.ext.dirs=./ -cp /usr/Paul MQTT31PerfHarness -su -nt 6000 -ss 10 -sc BasicStats -rl 0 -wp true -wc 50 -wi 60 -wt 90 -id 1 -qos 2 -ka 600 -cs false -tc mqtt.Subscriber -d TOPICSINGEL -db 1 -dx 20000 -dn 1 -iu tcp://9.119.154.107:1883 kswapd0 进程跳来跳去 top - 13:16:13 up 31 min, 4 users, load average: 617.83, 187.04, 65.29 Tasks: 161 total, 1 running, 160 sleeping, 0 stopped, 0 zombie Cpu(s): 2.9%us, 58.7%sy, 0.0%ni, 0.0%id, 34.5%wa, 0.0%hi, 3.9%si, 0.0%st Mem: 2957536k total, 2899268k used, 58268k free, 984k buffers Swap: 3096568k total, 805564k used, 2291004k free, 14656k cached PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 2538 root 20 0 25.2g 1.7g 4420 S 122.1 60.3 5:27.12 java [root@wwwu ~]# free -m total used free shared buffers cached Mem: 2888 2810 77 0 0 15 id=1,rate=996.20,threads=6000 id=1,rate=1007.30,threads=6000 以后司机 publish 只有700M可用内存 [root@oc8050533176 Local]# top top - 17:13:51 up 3:44, 2 users, load average: 0.00, 0.00, 0.05 Tasks: 124 total, 1 running, 123 sleeping, 0 stopped, 0 zombie Cpu(s): 14.7%us, 15.2%sy, 0.0%ni, 64.6%id, 0.0%wa, 0.0%hi, 5.4%si, 0.0%st Mem: 2011348k total, 1736976k used, 274372k free, 23976k buffers Swap: 8388600k total, 0k used, 8388600k free, 803732k cached PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 1946 root 20 0 3362m 591m 7292 S 66.7 30.1 3:22.69 java 最多2500个publish active 链接, throughput 3242 msgs/second, id=5,rate=3084.80,threads=2000 id=5,rate=2324.60,threads=2000 id=5,rate=3176.30,threads=2000 id=5,rate=3091.40,threads=2000 [mosquitto] connection 后的top [root@wwwu mosquitto]# top -b top - 05:50:55 up 2:00, 4 users, load average: 1.00, 0.81, 0.74 Tasks: 161 total, 2 running, 159 sleeping, 0 stopped, 0 zombie Cpu(s): 31.9%us, 7.4%sy, 0.0%ni, 54.8%id, 3.2%wa, 0.0%hi, 2.6%si, 0.0%st Mem: 2957536k total, 2518020k used, 439516k free, 31976k buffers Swap: 3096568k total, 0k used, 3096568k free, 209064k cached PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 2509 root 20 0 1264m 1.2g 740 R 77.9 42.5 93:35.86 mosquitto [root@wwwu ~]# netstat -an|grep ESTABLISH|grep 1883|wc -l 8000 遇到问题 [root@wwwu ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}' CLOSE_WAIT 1 FIN_WAIT1 298 ESTABLISHED 2372 LISTEN 12 tcp 0 21848 9.119.154.107:1883 9.119.154.160:55851(sub) FIN_WAIT1 tcp 0 37393 9.119.154.107:1883 9.119.154.160:57275 FIN_WAIT1 tcp 0 0 9.119.154.107:1883 9.119.154.160:56913 FIN_WAIT2 tcp 0 40545 9.119.154.107:1883 9.119.154.160:55864 FIN_WAIT1 netstat显示的链接状态有几种WAIT: FIN_WAIT_1,FIN_WAIT_2,CLOSE_WAIT和TIME_WAIT. 他们的含义要从TCP的链接中断过程提及 Server Client -------- FIN --------> <------- ACK --------- <------- FIN --------- -------- ACK --------> 假设服务器主动关闭链接(Active Close) 服务器首先向客户机发送FIN包,而后服务器进入FIN_WAIT_1状态。 客户机向服务器确认FIN包收到,向服务器发送FIN/ACK,客户机进入CLOSE_WAIT状态。 服务器收到来自客户机的FIN/ACK后,进入FIN_WAIT_2状态 如今客户机进入被动关闭(“passive close”)状态,客户机操做系统等待他上面的应用程序关闭链接。一旦链接被关闭,客户端会发送FIN包到服务器 当服务器收到FIN包后,服务器会向客户机发送FIN/ACK确认,而后进入著名的TIME_WAIT状态 因为在链接关闭后,还不能肯定全部链接关闭前的包都被服务器接受到了(包的接受是没有前后顺序的),所以有了TIME_WAIT状态。在这个状态中,服务器仍然在等待客户机发送的可是还未到达服务器的包。这个状态将保持2*MSL的时间,这里的MSL指的是一个TCP包在网络中存在的最长时间。通常状况下2*MSL=240秒。 ------------------------------------------------------------------------------------- 10000 sub connections mosquitto 2G 内存 32% CPU Tasks: 161 total, 1 running, 160 sleeping, 0 stopped, 0 zombie Cpu0 : 3.7%us, 11.6%sy, 0.0%ni, 84.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Cpu1 : 4.3%us, 13.6%sy, 0.0%ni, 82.1%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Mem: 2957536k total, 2202836k used, 754700k free, 34916k buffers Swap: 3096568k total, 0k used, 3096568k free, 48164k cached PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 2509 root 20 0 1264m 1.2g 740 S 32.6 42.5 148:35.98 mosquitto -----------------boss worker multithread model--------------------------- int queue[QUEUE_SIZE]; This is the main thread. It creates a queue struct (defined elsewhere) with methods enqueue, dequeue, empty, etc. When the server accepts a connection, it enqueues the socket that the incoming connection is on. The worker threads which were dispatched at the beginning are constantly checking this queue to see if any jobs have been added, and if there are jobs, then they dequeue the socket, connect to that port, and read/parse/write the incoming http request. int main(int argc, char* argv[]) { int hSocket, hServerSocket; struct hostent* pHostInfo; struct sockaddr_in Address; int nAddressSize = sizeof(struct sockaddr_in); int nHostPort; int numThreads; int i; init(&head,&tail); / hServerSocket=socket(AF_INET,SOCK_STREAM,0); if(hServerSocket == SOCKET_ERROR) { printf("\nCould not make a socket\n"); return 0; } Address.sin_addr.s_addr = INADDR_ANY; Address.sin_port = htons(nHostPort); Address.sin_family = AF_INET; printf("\nBinding to port %d\n",nHostPort); if(bind(hServerSocket,(struct sockaddr*)&Address,sizeof(Address)) == SOCKET_ERROR) { printf("\nCould not connect to host\n"); return 0; } getsockname(hServerSocket, (struct sockaddr *) &Address,(socklen_t *)&nAddressSize); printf("Opened socket as fd (%d) on port (%d) for stream i/o\n",hServerSocket, ntohs(Address.sin_port)); printf("Server\n\ sin_family = %d\n\ sin_addr.s_addr = %d\n\ sin_port = %d\n" , Address.sin_family , Address.sin_addr.s_addr , ntohs(Address.sin_port) ); //Up to this point is boring server set up stuff. I need help below this. / if(listen(hServerSocket,QUEUE_SIZE) == SOCKET_ERROR) { printf("\nCould not listen\n"); return 0; } while(1) { pthread_mutex_lock(&mtx); printf("\nWaiting for a connection"); while(!empty(head,tail)) { pthread_cond_wait (&cond2, &mtx); } hSocket = accept(hServerSocket,(struct sockaddr*)&Address,(socklen_t *)&nAddressSize); printf("\nGot a connection"); enqueue(queue,&tail,hSocket); pthread_mutex_unlock(&mtx); pthread_cond_signal(&cond); // wake worker thread } } void *worker(void *threadarg) { while(true) { pthread_mutex_lock(&mtx); while(empty(head,tail)) { pthread_cond_wait(&cond, &mtx); } int hSocket = dequeue(queue,&head); unsigned nSendAmount, nRecvAmount; char line[BUFFER_SIZE]; nRecvAmount = read(hSocket,line,sizeof line); printf("\nReceived %s from client\n",line); / if(close(hSocket) == SOCKET_ERROR) { printf("\nCould not close socket\n"); return 0; } pthread_mutex_unlock(&mtx); pthread_cond_signal(&cond); } epoll与select、poll区别 一、相比于select与poll,epoll最大的好处在于它不会随着监听fd数目的增加而下降效率。内核中的select与poll的实现是采用轮询来处理的,轮询的fd数目越多,天然耗时越多。 二、epoll的实现是基于回调的,若是fd有指望的事件发生就经过回调函数将其加入epoll就绪队列中,也就是说它只关心“活跃”的fd,与fd数目无关。 三、内核 / 用户空间 内存拷贝问题,如何让内核把 fd消息通知给用户空间呢?在这个问题上select/poll采起了内存拷贝方法。而epoll采用了共享内存的方式。 四、epoll不只会告诉应用程序有I/0 事件到来,还会告诉应用程序相关的信息,这些信息是应用程序填充的,所以根据这些信息应用程序就能直接定位到事件,而没必要遍历整个fd集合。 epoll 的EPOLLLT (水平触发,默认)和 EPOLLET(边沿触发)模式的区别 一、EPOLLLT:彻底靠kernel epoll驱动,应用程序只须要处理从epoll_wait返回的fds,这些fds咱们认为它们处于就绪状态。此时epoll能够认为是更快速的poll。 二、EPOLLET:此模式下,系统仅仅通知应用程序哪些fds变成了就绪状态,一旦fd变成就绪状态,epoll将再也不关注这个fd的任何状态信息,(从epoll队列移除)直到应用程序经过读写操做(非阻塞)触发EAGAIN状态,epoll认为这个fd又变为空闲状态,那么epoll又从新关注这个fd的状态变化(从新加入epoll队列)。随着epoll_wait的返回,队列中的fds是在减小的,因此在大并发的系统中,EPOLLET更有优点,可是对程序员的要求也更高,由于有可能会出现数据读取不完整的问题,举例以下: 假设如今对方发送了2k的数据,而咱们先读取了1k,而后这时调用了epoll_wait,若是是边沿触发,那么这个fd变成就绪状态就会从epoll 队列移除,极可能epoll_wait 会一直阻塞,忽略还没有读取的1k数据,与此同时对方还在等待着咱们发送一个回复ack,表示已经接收到数据;若是是电平触发,那么epoll_wait 还会检测到可读事件而返回,咱们能够继续读取剩下的1k 数据。 man epoll example #define MAX_EVENTS 10 struct epoll_event ev, events[MAX_EVENTS]; int listen_sock, conn_sock, nfds, epollfd; epollfd = epoll_create(10); if (epollfd == -1) { perror("epoll_create"); exit(EXIT_FAILURE); } ev.events = EPOLLIN; ev.data.fd = listen_sock; if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) { perror("epoll_ctl: listen_sock"); exit(EXIT_FAILURE); } for (;;) { nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1); if (nfds == -1) { perror("epoll_pwait"); exit(EXIT_FAILURE); } for (n = 0; n < nfds; ++n) { if (events[n].data.fd == listen_sock) { conn_sock = accept(listen_sock, (struct sockaddr *) &local, &addrlen); if (conn_sock == -1) { perror("accept"); exit(EXIT_FAILURE); } setnonblocking(conn_sock); ev.events = EPOLLIN | EPOLLET; ev.data.fd = conn_sock; if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock, &ev) == -1) { perror("epoll_ctl: conn_sock"); exit(EXIT_FAILURE); } } else { do_use_fd(events[n].data.fd); } } }
Mosquitto简要教程(安装/使用/测试) 上篇文章《Android主流推送方案分析(MQTT/XMPP/GCM)》中,咱们给你们介绍了,如何在移动领域使用灵巧的消息传输协议MQTT来完成消息推送,最后也提到了开源项目Mosquitto。实际上,Mosquitto是一个实现了MQTT3.1协议的代理服务器,由MQTT协议创始人之一的Andy Stanford-Clark开发,它为咱们提供了很是棒的轻量级数据交换的解决方案。本文的主旨在于记录Mosquitto服务的安装和使用,以备往后查阅。 1、获取&安装 Mosquitto提供了Windows、Linux以及qnx系统的版本,安装文件可从http://mosquitto.org/files/binary/地址中获取(建议使用最新的1.1.x版本)。Windows系统下的安装过程很是简单,咱们甚至能够把Mosquitto直接安装成为系统服务;可是,在实际应用中,咱们更倾向于使用Linux系统的服务器,接下来咱们就将重点介绍Linux版Mosquitto的安装方法。 在Linux系统上安装Mosquitto,本人建议你们使用源码安装模式,最新的源码可从http://mosquitto.org/files/source/地址中获取。解压以后,咱们能够在源码目录里面找到主要的配置文件config.mk,其中包含了全部Mosquitto的安装选项,详细的参数说明以下: [html] view plaincopy # 是否支持tcpd/libwrap功能. #WITH_WRAP:=yes # 是否开启SSL/TLS支持 #WITH_TLS:=yes # 是否开启TLS/PSK支持 #WITH_TLS_PSK:=yes # Comment out to disable client client threading support. #WITH_THREADING:=yes # 是否使用严格的协议版本(老版本兼容会有点问题) #WITH_STRICT_PROTOCOL:=yes # 是否开启桥接模式 #WITH_BRIDGE:=yes # 是否开启持久化功能 #WITH_PERSISTENCE:=yes # 是否监控运行状态 #WITH_MEMORY_TRACKING:=yes 这里须要注意的是,默认状况下Mosquitto的安装须要OpenSSL的支持;若是不须要SSL,则须要关闭config.mk里面的某些与SSL功能有关的选项(WITH_TLS、WITH_TLS_PSK)。接着,就是运行make install进行安装,完成以后会在系统命令行里发现mosquitto、mosquitto_passwd、mosquitto_pub和mosquitto_sub四个工具(截图以下),分别用于启动代理、管理密码、发布消息和订阅消息。 2、配置&运行 安装完成以后,全部配置文件会被放置于/etc/mosquitto/目录下,其中最重要的就是Mosquitto的配置文件,即mosquitto.conf,如下是详细的配置参数说明。 [html] view plaincopy # ================================================================= # General configuration # ================================================================= # 客户端心跳的间隔时间 #retry_interval 20 # 系统状态的刷新时间 #sys_interval 10 # 系统资源的回收时间,0表示尽快处理 #store_clean_interval 10 # 服务进程的PID #pid_file /var/run/mosquitto.pid # 服务进程的系统用户 #user mosquitto # 客户端心跳消息的最大并发数 #max_inflight_messages 10 # 客户端心跳消息缓存队列 #max_queued_messages 100 # 用于设置客户端长链接的过时时间,默认永不过时 #persistent_client_expiration # ================================================================= # Default listener # ================================================================= # 服务绑定的IP地址 #bind_address # 服务绑定的端口号 #port 1883 # 容许的最大链接数,-1表示没有限制 #max_connections -1 # cafile:CA证书文件 # capath:CA证书目录 # certfile:PEM证书文件 # keyfile:PEM密钥文件 #cafile #capath #certfile #keyfile # 必须提供证书以保证数据安全性 #require_certificate false # 若require_certificate值为true,use_identity_as_username也必须为true #use_identity_as_username false # 启用PSK(Pre-shared-key)支持 #psk_hint # SSL/TSL加密算法,可使用“openssl ciphers”命令获取 # as the output of that command. #ciphers # ================================================================= # Persistence # ================================================================= # 消息自动保存的间隔时间 #autosave_interval 1800 # 消息自动保存功能的开关 #autosave_on_changes false # 持久化功能的开关 persistence true # 持久化DB文件 #persistence_file mosquitto.db # 持久化DB文件目录 #persistence_location /var/lib/mosquitto/ # ================================================================= # Logging # ================================================================= # 4种日志模式:stdout、stderr、syslog、topic # none 则表示不记日志,此配置能够提高些许性能 log_dest none # 选择日志的级别(可设置多项) #log_type error #log_type warning #log_type notice #log_type information # 是否记录客户端链接信息 #connection_messages true # 是否记录日志时间 #log_timestamp true # ================================================================= # Security # ================================================================= # 客户端ID的前缀限制,可用于保证安全性 #clientid_prefixes # 容许匿名用户 #allow_anonymous true # 用户/密码文件,默认格式:username:password #password_file # PSK格式密码文件,默认格式:identity:key #psk_file # pattern write sensor/%u/data # ACL权限配置,经常使用语法以下: # 用户限制:user <username> # 话题限制:topic [read|write] <topic> # 正则限制:pattern write sensor/%u/data #acl_file # ================================================================= # Bridges # ================================================================= # 容许服务之间使用“桥接”模式(可用于分布式部署) #connection <name> #address <host>[:<port>] #topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix] # 设置桥接的客户端ID #clientid # 桥接断开时,是否清除远程服务器中的消息 #cleansession false # 是否发布桥接的状态信息 #notifications true # 设置桥接模式下,消息将会发布到的话题地址 # $SYS/broker/connection/<clientid>/state #notification_topic # 设置桥接的keepalive数值 #keepalive_interval 60 # 桥接模式,目前有三种:automatic、lazy、once #start_type automatic # 桥接模式automatic的超时时间 #restart_timeout 30 # 桥接模式lazy的超时时间 #idle_timeout 60 # 桥接客户端的用户名 #username # 桥接客户端的密码 #password # bridge_cafile:桥接客户端的CA证书文件 # bridge_capath:桥接客户端的CA证书目录 # bridge_certfile:桥接客户端的PEM证书文件 # bridge_keyfile:桥接客户端的PEM密钥文件 #bridge_cafile #bridge_capath #bridge_certfile #bridge_keyfile # 本身的配置能够放到如下目录中 include_dir /etc/mosquitto/conf.d 最后,启动Mosquitto服务很简单,直接运行命令行“mosquitto -c /etc/mosquitto/mosquitto.conf -d”便可开启服务。接下来,就让咱们尽情体验Mosquitto的强大功能吧!固然,有了Mosquitto,咱们就能够安心地抛弃“简陋”的rsmb了,有兴趣的话,你们还能够尝试把Mosquitto服务运用到上一篇的Android推送服务中。 另外,Mosquitto是个异步IO框架,经测试能够轻松处理20000个以上的客户端链接。固然,实际的最大承载量还和业务的复杂度还有比较大的关系。下图是本人在一台普通Linux机器上进行的压力测试结果,你们能够参考。 友情提醒:测试的时候不要忘记调整系统的最大链接数和栈大小,好比:Linux上可用ulimit -n20000 -s512命令设置你须要的系统参数。 http://blog.csdn.net/shagoo/article/details/7910598