MQTT定义了物联网传输协议,其标准倾向于原始TCP实现。构建于TCP的上层协议堆栈,诸如HTTP等,在空间上多了一些处理路径,稍微耗费了CPU和内存,虽看似微乎其微,但对不少处理能力不足的嵌入式设备而言,选择原始的TCP倒是最好的选择。java
但单纯TCP不是全部物件联网的最佳选择,提供构建与TCP基础之上的传统的HTTP通讯支持,尤为是浏览器、性能富裕的桌面涉及领域,仍是企业最 可信赖、最可控的传输方式之一。支持多种多样的链接通道,让目前全部一切皆可联网,除了原始TCP Socket,还要支持构建于其之上的HTTP、HTML5 Websocket,就颇有必要。git
mqtt.io,Pub/Sub中间件,也能够称之为推送服务器,涵盖全部主流桌面系统、浏览器平台,而且倾斜 于移动互联网,以及物联网的广阔适应天地。使用一句英文归纳可能更为合适:"Make everything connect”,让全部物件均可链接。其业务目标,可用下图归纳:github
mqtt.io致力于作下一代支持全部主流桌面平台、全部主流浏览器、全部可联网物件均可以联网的PUB/SUB消息推送系统。浏览器
构建此系统,在于下降传统企业各自分散的推送系统,统一运营,统一管理,节省人员、运维开支。服务器
用于转换二进制流到JAVA对象的过程:app
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
package io.mqtt.handler.coder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.io.ByteArrayInputStream;
import java.util.List;
import org.meqantt.message.Message;
import org.meqantt.message.MessageInputStream;
public class MqttMessageNewDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf buf,
List<Object> out) throws Exception {
if (buf.readableBytes() < 2) {
return;
}
buf
.markReaderIndex();
buf
.readByte(); // read away header
int msgLength = 0;
int multiplier = 1;
int digit;
int lengthSize = 0;
do {
lengthSize
++;
digit
= buf.readByte();
msgLength
+= (digit & 0x7f) * multiplier;
multiplier
*= 128;
if ((digit & 0x80) > 0 && !buf.isReadable()) {
buf
.resetReaderIndex();
return;
}
}
while ((digit & 0x80) > 0);
if (buf.readableBytes() < msgLength) {
buf
.resetReaderIndex();
return;
}
byte[] data = new byte[1 + lengthSize + msgLength];
buf
.resetReaderIndex();
buf
.readBytes(data);
MessageInputStream mis = new MessageInputStream(
new ByteArrayInputStream(data));
Message msg = mis.readMessage();
mis
.close();
out
.add(msg);
}
}
|
对全部要写入网卡缓冲区的JAVA对象转换成二进制:运维
12345678910111213141516171819202122232425 |
package io.mqtt.handler.coder;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import org.meqantt.message.Message;
@
Sharable
public class MqttMessageNewEncoder extends MessageToMessageEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg,
List<Object> out) throws Exception {
if (!(msg instanceof Message)) {
return;
}
byte[] data = ((Message) msg).toBytes();
out
.add(Unpooled.wrappedBuffer(data));
}
}
|
借助于mqtt-library项目,编解码不复杂。socket
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
package io.mqtt.handler;
import io.mqtt.processer.ConnectProcesser;
import io.mqtt.processer.DisConnectProcesser;
import io.mqtt.processer.PingReqProcesser;
import io.mqtt.processer.Processer;
import io.mqtt.processer.PublishProcesser;
import io.mqtt.processer.SubscribeProcesser;
import io.mqtt.processer.UnsubscribeProcesser;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.meqantt.message.ConnAckMessage;
import org.meqantt.message.ConnAckMessage.ConnectionStatus;
import org.meqantt.message.DisconnectMessage;
import org.meqantt.message.Message;
import org.meqantt.message.Message.Type;
import org.meqantt.message.PingRespMessage;
public class MqttMessageHandler extends ChannelInboundHandlerAdapter {
private static PingRespMessage PINGRESP = new PingRespMessage();
private static final Map<Message.Type, Processer> processers;
static {
Map<Message.Type, Processer> map = new HashMap<Message.Type, Processer>(
6);
map
.put(Type.CONNECT, new ConnectProcesser());
map
.put(Type.PUBLISH, new PublishProcesser());
map
.put(Type.SUBSCRIBE, new SubscribeProcesser());
map
.put(Type.UNSUBSCRIBE, new UnsubscribeProcesser());
map
.put(Type.PINGREQ, new PingReqProcesser());
map
.put(Type.DISCONNECT, new DisConnectProcesser());
processers
= Collections.unmodifiableMap(map);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e)
throws Exception {
try {
if (e.getCause() instanceof ReadTimeoutException) {
ctx
.write(PINGRESP).addListener(
ChannelFutureListener.CLOSE_ON_FAILURE);
}
else {
ctx
.channel().close();
}
}
catch (Throwable t) {
t
.printStackTrace();
ctx
.channel().close();
}
e
.printStackTrace();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj)
throws Exception {
Message msg = (Message) obj;
Processer p = processers.get(msg.getType());
if (p == null) {
return;
}
Message rmsg = p.proc(msg, ctx);
if (rmsg == null) {
return;
}
if (rmsg instanceof ConnAckMessage
&& ((ConnAckMessage) rmsg).getStatus() != ConnectionStatus.ACCEPTED) {
ctx
.write(rmsg).addListener(ChannelFutureListener.CLOSE);
}
else if (rmsg instanceof DisconnectMessage) {
ctx
.write(rmsg).addListener(ChannelFutureListener.CLOSE);
}
else {
ctx
.write(rmsg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx
.flush();
}
}
|
更具体的能够查看项目。ide
简单介绍了一个简单的不能再简单的MQTT Server,只具备最基本的QoS 0类型的消息订阅等。性能
后面,对HTML 5 Websocket,会在现有基础代码之上,不作多大改动,增长对MQTT Over WebSocket的支持。