本篇文章主要介绍的是SpringBoot整合Netty以及使用Protobuf进行数据传输的相关内容。Protobuf会简单的介绍下用法,至于Netty在以前的文章中已经简单的介绍过了,这里就再也不过多细说了。html
protocolbuffer(如下简称PB)是google 的一种数据交换的格式,它独立于语言,独立于平台。google 提供了多种语言的实现:java、c#、c++、go 和python,每一种实现都包含了相应语言的编译器以及库文件。因为它是一种二进制的格式,比使用 xml进行数据交换快许多。能够把它用于分布式应用之间的数据通讯或者异构环境下的数据交换。做为一种效率和兼容性都很优秀的二进制数据传输格式,能够用于诸如网络传输、配置文件、数据存储等诸多领域。java
官方地址: https://github.com/google/protobufpython
这里的使用就只介绍Java相关的使用。 首先咱们须要创建一个proto文件,在该文件定义咱们须要传输的文件。 例如咱们须要定义一个用户的信息,包含的字段主要有编号、名称、年龄。 那么该protobuf文件的格式以下: 注:这里使用的是proto3,相关的注释我已写了,这里便再也不过多讲述了。须要注意一点的是proto文件和生成的Java文件名称不能一致!c++
syntax = "proto3";
// 生成的包名
option java_package="com.pancm.protobuf";
//生成的java名
option java_outer_classname = "UserInfo";
message UserMsg {
// ID
int32 id = 1;
// 姓名
string name = 2;
// 年龄
int32 age = 3;
// 状态
int32 state = 4;
}
复制代码
建立好该文件以后,咱们把该文件和protoc.exe(生成Java文件的软件)放到E盘目录下的protobuf文件夹下,而后再到该目录的dos界面下输入:protoc.exe --java_out=文件绝对路径名称
。 例如:git
protoc.exe --java_out=E:\protobuf User.proto
复制代码
输入完以后,回车便可在同级目录看到已经生成好的Java文件,而后将该文件放到项目中该文件指定的路径下便可。github
注:生成protobuf的文件软件和测试的protobuf文件我也整合到该项目中了,能够直接获取的。spring
Java文件生成好以后,咱们再来看怎么使用。 这里我就直接贴代码了,而且将注释写在代码中,应该更容易理解些吧。。。 代码示例:json
// 按照定义的数据结构,建立一个对象
UserInfo.UserMsg.Builder userInfo = UserInfo.UserMsg.newBuilder();
userInfo.setId(1);
userInfo.setName("xuwujing");
userInfo.setAge(18);
UserInfo.UserMsg userMsg = userInfo.build();
// 将数据写到输出流
ByteArrayOutputStream output = new ByteArrayOutputStream();
userMsg.writeTo(output);
// 将数据序列化后发送
byte[] byteArray = output.toByteArray();
// 接收到流并读取
ByteArrayInputStream input = new ByteArrayInputStream(byteArray);
// 反序列化
UserInfo.UserMsg userInfo2 = UserInfo.UserMsg.parseFrom(input);
System.out.println("id:" + userInfo2.getId());
System.out.println("name:" + userInfo2.getName());
System.out.println("age:" + userInfo2.getAge());
复制代码
注:这里说明一点,由于protobuf是经过二进制进行传输,因此须要注意下相应的编码。还有使用protobuf也须要注意一下一次传输的最大字节长度。bootstrap
输出结果:c#
id:1
name:xuwujing
age:18
复制代码
说明:若是想直接获取工程那么能够直接跳到底部,经过连接下载工程代码。
环境要求 JDK::1.8 Netty::4.0或以上(不包括5) Protobuf:3.0或以上
若是对Netty不熟的话,能够看看我以前写的一些文章。大神请无视~。~ 地址:https://blog.csdn.net/column/details/17640.html
首先仍是Maven的相关依赖:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<netty.version>4.1.22.Final</netty.version>
<protobuf.version>3.5.1</protobuf.version>
<springboot>1.5.9.RELEASE</springboot>
<fastjson>1.2.41</fastjson>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${springboot}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<version>${springboot}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
复制代码
添加了相应的maven依赖以后,配置文件这块暂时没有什么能够添加的,由于暂时就一个监听的端口而已。
代码模块主要分为服务端和客户端。 主要实现的业务逻辑: 服务端启动成功以后,客户端也启动成功,这时服务端会发送一条protobuf格式的信息给客户端,而后客户端给予相应的应答。客户端与服务端链接成功以后,客户端每一个一段时间会发送心跳指令给服务端,告诉服务端该客户端还存过中,若是客户端没有在指定的时间发送信息,服务端会关闭与该客户端的链接。当客户端没法链接到服务端以后,会每隔一段时间去尝试重连,只到重连成功!
首先是编写服务端的启动类,相应的注释在代码中写得很详细了,这里也再也不过多讲述了。不过须要注意的是,在以前的我写的Netty文章中,是经过main方法直接启动服务端,所以是直接new一个对象的。而在和SpringBoot整合以后,咱们须要将Netty交给springBoot去管理,因此这里就用了相应的注解。 代码以下:
@Service("nettyServer")
public class NettyServer {
private static final int port = 9876; // 设置服务端端口
private static EventLoopGroup boss = new NioEventLoopGroup(); // 经过nio方式来接收链接和处理链接
private static EventLoopGroup work = new NioEventLoopGroup(); // 经过nio方式来接收链接和处理链接
private static ServerBootstrap b = new ServerBootstrap();
@Autowired
private NettyServerFilter nettyServerFilter;
public void run() {
try {
b.group(boss, work);
b.channel(NioServerSocketChannel.class);
b.childHandler(nettyServerFilter); // 设置过滤器
// 服务器绑定端口监听
ChannelFuture f = b.bind(port).sync();
System.out.println("服务端启动成功,端口是:" + port);
// 监听服务器关闭监听
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭EventLoopGroup,释放掉全部资源包括建立的线程
work.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
复制代码
服务端主类编写完毕以后,咱们再来设置下相应的过滤条件。 这里须要继承Netty中ChannelInitializer类,而后重写initChannel该方法,进行添加相应的设置,如心跳超时设置,传输协议设置,以及相应的业务实现类。 代码以下:
@Component
public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
@Autowired
private NettyServerHandler nettyServerHandler;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
//入参说明: 读超时时间、写超时时间、全部类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
// 解码和编码,应和客户端一致
//传输的协议 Protobuf
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
//业务逻辑实现类
ph.addLast("nettyServerHandler", nettyServerHandler);
}
}
复制代码
服务相关的设置的代码写完以后,咱们再来编写主要的业务代码。 使用Netty编写业务层的代码,咱们须要继承ChannelInboundHandlerAdapter 或SimpleChannelInboundHandler类,在这里顺便说下它们两的区别吧。 继承SimpleChannelInboundHandler类以后,会在接收到数据后会自动release掉数据占用的Bytebuffer资源。而且继承该类须要指定数据格式。 而继承ChannelInboundHandlerAdapter则不会自动释放,须要手动调用ReferenceCountUtil.release()等方法进行释放。继承该类不须要指定数据格式。 因此在这里,我的推荐服务端继承ChannelInboundHandlerAdapter,手动进行释放,防止数据未处理完就自动释放了。并且服务端可能有多个客户端进行链接,而且每个客户端请求的数据格式都不一致,这时即可以进行相应的处理。 客户端根据状况能够继承SimpleChannelInboundHandler类。好处是直接指定好传输的数据格式,就不须要再进行格式的转换了。
代码以下:
@Service("nettyServerHandler")
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/** 空闲次数 */
private int idle_count = 1;
/** 发送次数 */
private int count = 1;
/**
* 创建链接时,发送一条消息
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("链接的客户端地址:" + ctx.channel().remoteAddress());
UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0)
.build();
ctx.writeAndFlush(userMsg);
super.channelActive(ctx);
}
/**
* 超时处理 若是5秒没有接受客户端的心跳,就触发; 若是超过两次,则直接关闭;
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
if (IdleState.READER_IDLE.equals(event.state())) { // 若是读通道处于空闲状态,说明没有接收到心跳命令
System.out.println("已经5秒没有接收到客户端的信息了");
if (idle_count > 1) {
System.out.println("关闭这个不活跃的channel");
ctx.channel().close();
}
idle_count++;
}
} else {
super.userEventTriggered(ctx, obj);
}
}
/**
* 业务逻辑处理
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("第" + count + "次" + ",服务端接受的消息:" + msg);
try {
// 若是是protobuf类型的数据
if (msg instanceof UserMsg) {
UserInfo.UserMsg userState = (UserInfo.UserMsg) msg;
if (userState.getState() == 1) {
System.out.println("客户端业务处理成功!");
} else if(userState.getState() == 2){
System.out.println("接受到客户端发送的心跳!");
}else{
System.out.println("未知命令!");
}
} else {
System.out.println("未知数据!" + msg);
return;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg);
}
count++;
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
复制代码
还有个服务端的启动类,以前是经过main方法直接启动, 不过这里改为了经过springBoot进行启动,差异不大。 代码以下:
@SpringBootApplication
public class NettyServerApp {
public static void main(String[] args) {
// 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件
ApplicationContext context = SpringApplication.run(NettyServerApp.class, args);
NettyServer nettyServer = context.getBean(NettyServer.class);
nettyServer.run();
}
}
复制代码
到这里服务端相应的代码就编写完毕了。
客户端这边的代码和服务端的不少地方都相似,我就再也不过多细说了,主要将一些不一样的代码拿出来简单的讲述下。 首先是客户端的主类,基本和服务端的差很少,也就是多了监听的端口和一个监听器(用来监听是否和服务端断开链接,用于重连)。 主要实现的代码逻辑以下:
public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
ChannelFuture f = null;
try {
if (bootstrap != null) {
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(nettyClientFilter);
bootstrap.remoteAddress(host, port);
f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
final EventLoop eventLoop = futureListener.channel().eventLoop();
if (!futureListener.isSuccess()) {
System.out.println("与服务端断开链接!在10s以后准备尝试重连!");
eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
}
});
if(initFalg){
System.out.println("Netty客户端启动成功!");
initFalg=false;
}
// 阻塞
f.channel().closeFuture().sync();
}
} catch (Exception e) {
System.out.println("客户端链接失败!"+e.getMessage());
}
}
复制代码
注:监听器这块的实现用的是JDK1.8的写法。
客户端过滤其这块基本和服务端一直。不过须要注意的是,传输协议、编码和解码应该一致,还有心跳的读写时间应该小于服务端所设置的时间。 改动的代码以下:
ChannelPipeline ph = ch.pipeline();
/*
* 解码和编码,应和服务端一致
* */
//入参说明: 读超时时间、写超时时间、全部类型的超时时间、时间格式
ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
复制代码
客户端的业务代码逻辑。 主要实现的几点逻辑是心跳按时发送以及解析服务发送的protobuf格式的数据。
这里比服务端多个个注解, 该注解Sharable主要是为了多个handler能够被多个channel安全地共享,也就是保证线程安全。 废话就很少说了,代码以下:
@Service("nettyClientHandler")
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Autowired
private NettyClient nettyClient;
/** 循环次数 */
private int fcount = 1;
/**
* 创建链接时
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("创建链接时:" + new Date());
ctx.fireChannelActive();
}
/**
* 关闭链接时
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("关闭链接时:" + new Date());
final EventLoop eventLoop = ctx.channel().eventLoop();
nettyClient.doConnect(new Bootstrap(), eventLoop);
super.channelInactive(ctx);
}
/**
* 心跳请求处理 每4秒发送一次心跳请求;
*
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
System.out.println("循环请求的时间:" + new Date() + ",次数" + fcount);
if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
if (IdleState.WRITER_IDLE.equals(event.state())) { // 若是写通道处于空闲状态,就发送心跳命令
UserMsg.Builder userState = UserMsg.newBuilder().setState(2);
ctx.channel().writeAndFlush(userState);
fcount++;
}
}
}
/**
* 业务逻辑处理
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 若是不是protobuf类型的数据
if (!(msg instanceof UserMsg)) {
System.out.println("未知数据!" + msg);
return;
}
try {
// 获得protobuf的数据
UserInfo.UserMsg userMsg = (UserInfo.UserMsg) msg;
// 进行相应的业务处理。。。
// 这里就从简了,只是打印而已
System.out.println(
"客户端接受到的用户信息。编号:" + userMsg.getId() + ",姓名:" + userMsg.getName() + ",年龄:" + userMsg.getAge());
// 这里返回一个已经接受到数据的状态
UserMsg.Builder userState = UserMsg.newBuilder().setState(1);
ctx.writeAndFlush(userState);
System.out.println("成功发送给服务端!");
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg);
}
}
}
复制代码
那么到这里客户端的代码也编写完毕了。
首先启动服务端,而后再启动客户端。 咱们来看看结果是否如上述所说。
服务端输出结果:
服务端启动成功,端口是:9876
链接的客户端地址:/127.0.0.1:53319
第1次,服务端接受的消息:state: 1
客户端业务处理成功!
第2次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
第3次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
第4次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
复制代码
客户端输入结果:
Netty客户端启动成功!
创建链接时:Mon Jul 16 23:31:58 CST 2018
客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
成功发送给服务端!
循环请求的时间:Mon Jul 16 23:32:02 CST 2018,次数1
循环请求的时间:Mon Jul 16 23:32:06 CST 2018,次数2
循环请求的时间:Mon Jul 16 23:32:10 CST 2018,次数3
循环请求的时间:Mon Jul 16 23:32:14 CST 2018,次数4
复制代码
经过打印信息能够看出如上述所说。
接下来咱们再来看看客户端是否可以实现重连。 先启动客户端,再启动服务端。
客户端输入结果:
Netty客户端启动成功!
与服务端断开链接!在10s以后准备尝试重连!
客户端链接失败!AbstractChannel$CloseFuture@1fbaa3ac(incomplete)
创建链接时:Mon Jul 16 23:41:33 CST 2018
客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
成功发送给服务端!
循环请求的时间:Mon Jul 16 23:41:38 CST 2018,次数1
循环请求的时间:Mon Jul 16 23:41:42 CST 2018,次数2
循环请求的时间:Mon Jul 16 23:41:46 CST 2018,次数3
复制代码
服务端输出结果:
服务端启动成功,端口是:9876
链接的客户端地址:/127.0.0.1:53492
第1次,服务端接受的消息:state: 1
客户端业务处理成功!
第2次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
第3次,服务端接受的消息:state: 2
接受到客户端发送的心跳!
第4次,服务端接受的消息:state: 2
复制代码
结果也如上述所说!
关于SpringBoot整合Netty使用Protobuf进行数据传输到这里就结束了。 SpringBoot整合Netty使用Protobuf进行数据传输的项目工程地址: https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf
对了,也有不使用springBoot整合的Netty项目工程地址: https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf
原创不易,若是感受不错,但愿给个推荐!您的支持是我写做的最大动力! 版权声明: 做者:虚无境 博客园出处:http://www.cnblogs.com/xuwujing CSDN出处:http://blog.csdn.net/qazwsxpcm 我的博客出处:http://www.panchengming.com