运行环境:php
技术栈:html
IDE:java
近年来,物联网高歌猛进,美国有“工业互联网”,德国有“工业4.0”,我国也有“中国制造2025”,这背后都是云计算、大数据。据波士顿咨询报告,单单中国制造业,云计算、大数据、人工智能等新技术就能为其带来高达6万亿的额外附加值。python
国内外巨头纷纷驻足工业互联网,国外如亚马逊AWS、微软Azure,国内则是三大电信运营商、百度云、华为、金山云等,其中腾讯云、阿里云最甚,还拉来了传统制造大佬,国内巨头纷纷在物联网上布局。在2018云栖-深圳峰会上,阿里巴巴资深副总裁,阿里云总裁胡晓明宣布阿里巴巴将正式进军IoT。胡晓明表示,IoT是阿里巴巴集团继电商、金融、物流、云计算以后的一条新的主赛道。git
以上这些内容,做者做为一个开发人员,并非一个投资人员和创业先锋。并不太关系这些具体细节。我所关心的是如何用技术去实现或者模拟一个支持百万连接的IOT服务器,并不严谨,仅作你们参考。github
关于为何选用下图的中间件或者对MQTT不太了解的话,能够阅读我以前的2篇文章:golang
git clone github.com/sanshengshu…web
cd netty-iotredis
运行 NettyIotApplicationdocker
启动Eclipse Paho,并填写用户名和密码,便可链接。
另起一个Eclipse Paho,订阅随意主题,例如test。另外一个Eclipse Paho发布主题test。便可收到消息。
取消主题订阅,再次发布消息。就收不到消息。
有了前面2篇文章的铺垫并学习了MQTT V3.1.1 协议,说了那么多,手痒痒的很。
You build it, You run it!
netty-iot
├── auth -- 认证
├── service -- 用户名,密码认证明现类
├── util -- 认证工具类
├── common -- 公共类
├── auth -- 用户名,密码认证接口
├── message -- 协议存储实体及接口类
├── session -- session存储实体及接口类
├── subscribe -- 订阅存储实体及接口类
├── config -- Redis配置
├── protocol -- MQTT协议实现
├── server -- MQTT服务器
├── store -- Redis数据存储
├── cache
├── message
├── session
├── subscribe
├── web -- web服务
├── NettyIotApplication -- 服务启动类
复制代码
体验 Redis 须要使用 Linux 或者 Mac 环境,若是是 Windows 能够考虑使用虚拟机。主要方式有四种:
具体操做以下:
Docker 方式
# 拉取 redis 镜像
> docker pull redis
# 运行 redis 容器
> docker run --name myredis -d -p6379:6379 redis
# 执行容器中的 redis-cli,能够直接使用命令行操做 redis
> docker exec -it myredis redis-cli...
复制代码
Github 源码编译方式
# 下载源码
> git clone --branch 2.8 --depth 1 git@github.com:antirez/redis.git
> cd redis
# 编译
> make
> cd src
# 运行服务器,daemonize表示在后台运行
> ./redis-server --daemonize yes
# 运行命令行
> ./redis-cli...
复制代码
直接安装方式
# mac
> brew install redis
# ubuntu
> apt-get install redis
# redhat
> yum install redis
# 运行客户端
> redis-cli
复制代码
Spring Boot除了支持常见的ORM框架外,更是对经常使用的中间件提供了很是好封装,随着
Spring Boot2.x的到来,支持的组件愈来愈丰富,也愈来愈成熟,其中对
Redis的支持不只仅是丰富了它的API,更是替换掉底层
Jedis的依赖,取而代之换成了
Lettuce(生菜),你们能够参考这篇文章对工程进行配置。因此我使用Lettuce做为客户端来对个人MQTT协议传输的消息进行缓存。
下列的是Redis所对应的操做方式
我主要使用opsForValue,opsForHash和opsForZSet,对于字符串。我推荐使用StringRedisTemplate。
如下对于opsForValue和opsForHash的基础操做,我在这里简短的讲解一下。
Redis的散列可让用户将多个键值对存储到一个Redis键里面。 public interface HashOperations<H,HK,HV> HashOperations提供一系列方法操做hash:
java > template.opsForHash().put("books","java","think in java");
redis-cli > hset books java "think in java" # 命令行的字符串若是包含空格,要用引号括起来
(integer) 1
------
java > template.opsForHash().put("books","golang","concurrency in go");
redis-cli > hset books golang "concurrency in go"
(integer) 1
------
java > template.opsForHash().put("books","python","python cookbook");
redis-cli > hset books python "python cookbook"
(integer) 1
------
java > template.opsForHash().entries("books")
redis-cli > hgetall books # entries(),key 和 value 间隔出现
1) "java"
2) "think in java"
3) "golang"
4) "concurrency in go"
5) "python"
6) "python cookbook"
------
java > template.opsForHash().size("books")
redis-cli > hlen books
(integer) 3
------
java > template.opsForHash().get("redisHash","age")
redi-cli > hget books java
"think in java"
------
java >
Map<String,Object> testMap = new HashMap();
testMap.put("java","effective java");
testMap.put("python","learning python");
testMap.put("golang","modern golang programming");
template.opsForHash().putAll("books",testMap);
redis-cli > hmset books java "effective java" python "learning python" golang "modern golang programming" # 批量 set
OK...
复制代码
Redis的Set是string类型的无序集合。集合成员是惟一的,这就意味着集合中不能出现重复的数据。 Redis 中 集合是经过哈希表实现的,因此添加,删除,查找的复杂度都是O(1)。
java > template.opsForSet().add("python","java","golang")
redis-cli > sadd books python java golang
(integer) 3
------
java > template.opsForSet().members("books")
redis-cli > smembers books # 注意顺序,和插入的并不一致,由于 set 是无序的
1) "java"
2) "python"
3) "golang"
------
java > template.opsForSet().isMember("books","java")
redis-cli > sismember books java # 查询某个 value 是否存在,至关于 contains(o)
(integer) 1
------
java > template.opsForSet().size("books")
redis-cli > scard books # 获取长度至关于 count()
(integer) 3
------
java > template.opsForSet().pop("books")
redis-cli > spop books # 弹出一个
"java"...
复制代码
MQTT是一种轻量级的发布/订阅消息传递协议,最初由IBM和Arcom(后来成为Eurotech的一部分)于1998年左右建立。如今,MQTT 3.1.1规范已由OASIS联盟标准化。
对于MQTT客户端,我选用Eclipse Paho,Eclipse Paho项目提供针对物联网(IoT)的新的,现有的和新兴的应用程序的MQTT和MQTT-SN消息传递协议的开源客户端实现。具体下载地址,你们根据本身的操做系统自行下载。
├── Connect -- 链接服务端
├── DisConnect -- 断开链接
├── PingReq -- 心跳请求
├── PubAck -- 发布确认
├── PubComp -- 发布完成(QoS2,第散步)
├── Publish -- 发布消息
├── PubRec -- 发布收到(QoS2,第一步)
├── PubRel -- 发布释放(QoS2,第二步)
├── Subscribe -- 订阅主题
├── UnSubscribe -- 取消订阅
复制代码
让咱们对照着MQTT 3.1.1协议来实现客户端Connect协议。
当咱们对消息解码时,若是协议名不正确服务端能够断开客户端的链接,按照本规范,服务端不能继续处理CONNECT报。
服务端使用客户端标识符 (ClientId) 识别客户端。链接服务端的每一个客户端都有惟一的客户端标识符(ClientId)。
// 消息解码器出现异常
if (msg.decoderResult().isFailure()) {
Throwable cause = msg.decoderResult().cause();
if (cause instanceof MqttUnacceptableProtocolVersionException) {
// 不支持的协议版本
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
} else if (cause instanceof MqttIdentifierRejectedException) {
// 不合格的clientId
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}
channel.close();
return;
}
复制代码
clientId为空或null的状况, 这里要求客户端必须提供clientId, 无论cleanSession是否为1, 此处没有参考标准协议实现
if (StrUtil.isBlank(msg.payload().clientIdentifier())) {
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}
复制代码
用户名和密码验证, 这里要求客户端链接时必须提供用户名和密码, 不论是否设置用户名标志和密码标志为1, 此处没有参考标准协议实现
String username = msg.payload().userName();
String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8);
if (!grozaAuthService.checkValid(username,password)) {
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}
复制代码
若是会话中已存储这个新链接的clientId, 就关闭以前该clientId的链接
if (grozaSessionStoreService.containsKey(msg.payload().clientIdentifier())){
SessionStore sessionStore = grozaSessionStoreService.get(msg.payload().clientIdentifier());
Channel previous = sessionStore.getChannel();
Boolean cleanSession = sessionStore.isCleanSession();
if (cleanSession){
grozaSessionStoreService.remove(msg.payload().clientIdentifier());
grozaSubscribeStoreService.removeForClient(msg.payload().clientIdentifier());
grozaDupPublishMessageStoreService.removeByClient(msg.payload().clientIdentifier());
grozaDupPubRelMessageStoreService.removeByClient(msg.payload().clientIdentifier());
}
previous.close();
}
复制代码
处理遗嘱信息
SessionStore sessionStore = new SessionStore(msg.payload().clientIdentifier(), channel, msg.variableHeader().isCleanSession(), null);
if (msg.variableHeader().isWillFlag()){
MqttPublishMessage willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.valueOf(msg.variableHeader().willQos()),msg.variableHeader().isWillRetain(),0),
new MqttPublishVariableHeader(msg.payload().willTopic(),0),
Unpooled.buffer().writeBytes(msg.payload().willMessageInBytes())
);
sessionStore.setWillMessage(willMessage);
}
复制代码
处理链接心跳包
if (msg.variableHeader().keepAliveTimeSeconds() > 0){
if (channel.pipeline().names().contains("idle")){
channel.pipeline().remove("idle");
}
channel.pipeline().addFirst("idle",new IdleStateHandler(0, 0, Math.round(msg.variableHeader().keepAliveTimeSeconds() * 1.5f)));
}
复制代码
至此存储会话消息及返回接受客户端链接 将clientId存储到channel的map中
grozaSessionStoreService.put(msg.payload().clientIdentifier(),sessionStore);
channel.attr(AttributeKey.valueOf("clientId")).set(msg.payload().clientIdentifier());
Boolean sessionPresent = grozaSessionStoreService.containsKey(msg.payload().clientIdentifier()) && !msg.variableHeader().isCleanSession();
MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK,false,MqttQoS.AT_MOST_ONCE,false,0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED,sessionPresent),
null
);
channel.writeAndFlush(okResp);
复制代码
若是cleanSession为0, 须要重发同一clientId存储的未完成的QoS1和QoS2的DUP消息
if (!msg.variableHeader().isCleanSession()){
List<DupPublishMessageStore> dupPublishMessageStoreList = grozaDupPublishMessageStoreService.get(msg.payload().clientIdentifier());
List<DupPubRelMessageStore> dupPubRelMessageStoreList = grozaDupPubRelMessageStoreService.get(msg.payload().clientIdentifier());
dupPublishMessageStoreList.forEach(dupPublishMessageStore -> {
MqttPublishMessage publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH,true,MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()),false,0),
new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(),dupPublishMessageStore.getMessageId()),
Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes())
);
channel.writeAndFlush(publishMessage);
});
dupPubRelMessageStoreList.forEach(dupPubRelMessageStore -> {
MqttMessage pubRelMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREL,true,MqttQoS.AT_MOST_ONCE,false,0),
MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()),
null
);
channel.writeAndFlush(pubRelMessage);
});
}
复制代码
其余MQTT报文你们对照着工程并对照着MQTT v3.1.1自行查看!
/**
* 用户名和密码认证服务
* @author 穆书伟
*/
@Service
public class AuthServiceImpl implements GrozaAuthService {
private RSAPrivateKey privateKey;
@Override
public boolean checkValid(String username, String password) {
if (StringUtils.isEmpty(username)){
return false;
}
if (StringUtils.isEmpty(password)){
return false;
}
RSA rsa = new RSA(privateKey,null);
String value = rsa.encryptBcd(username, KeyType.PrivateKey);
return value.equals(password) ? true : false;
}
@PostConstruct
public void init() {
privateKey = IoUtil.readObj(AuthServiceImpl.class.getClassLoader().getResourceAsStream("keystore/auth-private.key"));
}
}
复制代码
关于Netty实现高性能IOT服务器(Groza)之精尽代码篇中详解到这里就结束了。
原创不易,若是感受不错,但愿给个推荐!您的支持是我写做的最大动力!
下文会带你们推动Netty实现MQTT协议的IOT服务器。
版权声明:
做者:穆书伟
博客园出处:www.cnblogs.com/sanshengshu…
github出处:github.com/sanshengshu…
我的博客出处:sanshengshui.github.io/