Netty实践(三):实际场景下的数据通讯

数据通讯的场景:长链接 OR 短链接java

在实际场景中,咱们如何使用Netty进行通讯呢?大体有3种方式:
bootstrap

第一种,使用长链接通道不断开的形式进行通讯,也就是服务器和客户端的通道一直处于开启的状态。若是服务器性能足够好,而且咱们的客户端数量也比较少的状况下,是适合使用长链接的通道。数组

第二种,采用短链接方式,一次性批量提交数据,也就是咱们会把数据保存在本地临时缓冲区或者临时表里。当达到数量时,就进行批量提交;或者经过定时任务轮询提交。这种状况是有弊端的,就是没法作到实时传输。若是应用程序对实时性要求不高,能够考虑使用。服务器

第三种,采用一种特殊的长链接。特殊在哪里呢?在指定的某一时间以内,服务器与某台客户端没有任何通讯,则断开链接,若是断开链接后,客户端又须要向服务器发送请求,那么再次创建链接。这里有点CachedThreadPool的味道。框架

本篇博客将采用Netty来实现第三种方式的数据通讯,接下来咱们一块儿来看看吧~socket



Netty数据通讯代码实例ide


请求消息对象工具

package day3;

import java.io.Serializable;

public class Request implements Serializable{

   private static final long  SerialVersionUID = 1L;
   
   private String id ;
   private String name ;
   private String requestMessage ;
   
   public String getId() {
      return id;
   }
   public void setId(String id) {
      this.id = id;
   }
   public String getName() {
      return name;
   }
   public void setName(String name) {
      this.name = name;
   }
   public String getRequestMessage() {
      return requestMessage;
   }
   public void setRequestMessage(String requestMessage) {
      this.requestMessage = requestMessage;
   }


}


响应消息对象oop

package day3;

import java.io.Serializable;

public class Response implements Serializable{
   
   private static final long serialVersionUID = 1L;
   
   private String id;
   private String name;
   private String responseMessage;
   
   public String getId() {
      return id;
   }
   public void setId(String id) {
      this.id = id;
   }
   public String getName() {
      return name;
   }
   public void setName(String name) {
      this.name = name;
   }
   public String getResponseMessage() {
      return responseMessage;
   }
   public void setResponseMessage(String responseMessage) {
      this.responseMessage = responseMessage;
   }
   

}


编解码处理器
性能

package day3;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
 * Marshalling工厂
 */
public final class MarshallingCodeCFactory {

    /**
     * 建立Jboss Marshalling×××MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
       //首先经过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识建立的是java序列化工厂对象。
      final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
      //建立了MarshallingConfiguration对象,配置了版本号为5 
      final MarshallingConfiguration configuration = new MarshallingConfiguration();
      configuration.setVersion(5);
      //根据marshallerFactory和configuration建立provider
      UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
      //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
      MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
      return decoder;
    }

    /**
     * 建立Jboss Marshalling编码器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
      final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
      final MarshallingConfiguration configuration = new MarshallingConfiguration();
      configuration.setVersion(5);
      MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
      //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
      MarshallingEncoder encoder = new MarshallingEncoder(provider);
      return encoder;
    }
}

注意,在上一篇博客《Netty实践(二):TCP拆包、粘包问题》中,咱们是本身继承ByteToMessageDecoder、MessageToByteEncoder来实现ByteBuff与消息对象的转化的,其实这是有点麻烦的。在实际中,咱们彻底能够利用相关序列化框架(JBoss Marshlling/Protobuf/Kryo/MessagePack)来帮助咱们快速完成编解码,这里我使用的是JBoss Marshalling(jboss-marshalling-1.3.0.CR9.jar+jboss-marshalling-serial-1.3.0.CR9.jar)。具体来讲,客户端和服务端交互的消息对象只须要实现JDK默认的序列化接口,同时利用JBoss Marshalling 生成编码器和×××,用于后续Client/Server端便可。


Client Handler

package day3;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter{
   
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      try {
         Response resp = (Response)msg;
         System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());       
      } finally {
         ReferenceCountUtil.release(msg);
      }
   }

   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      ctx.close();
   }
   
}

在这里能够清楚的看到,咱们直接将Object转化成了自定义消息响应对象,可见JBoss Marshalling与Netty结合后,编解码是如此简单。



Client

package day3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

import java.util.concurrent.TimeUnit;


/**
 *
 */
public class Client {
   
   private static class SingletonHolder {
      static final Client instance = new Client();
   }
   
   public static Client getInstance(){
      return SingletonHolder.instance;
   }
   
   private EventLoopGroup group;
   private Bootstrap b;

   //经过ChannelFuture实现读写操做
   private ChannelFuture cf ;
   
   private Client(){
         group = new NioEventLoopGroup();
         b = new Bootstrap();
         b.group(group)
          .channel(NioSocketChannel.class)
          .handler(new LoggingHandler(LogLevel.INFO))
          .handler(new ChannelInitializer<SocketChannel>() {
               @Override
               protected void initChannel(SocketChannel sc) throws Exception {

                  sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                  sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                  //超时handler(当服务器端与客户端在指定时间以上没有任何进行通讯,则会关闭相应的通道,主要为减少服务端资源占用)
                  sc.pipeline().addLast(new ReadTimeoutHandler(3)); 
                  sc.pipeline().addLast(new ClientHandler());
               }
          });
   }
   
   public void connect(){
      try {
         this.cf = b.connect("127.0.0.1", 8765).sync();
         System.out.println("远程服务器已经链接, 能够进行数据交换..");            
      } catch (Exception e) {
         e.printStackTrace();
      }
   }

   //这里是通道关闭,再次创建链接的核心代码
   public ChannelFuture getChannelFuture(){
      
      if(this.cf == null){
         this.connect();
      }
      if(!this.cf.channel().isActive()){
         this.connect();
      }
      
      return this.cf;
   }
   
   public static void main(String[] args) throws Exception{

      final Client c = Client.getInstance();

      //注意client好像没有调用connect()方法进行链接,可是实际上在下面的代码中作了
      ChannelFuture cf = c.getChannelFuture();

      for(int i = 1; i <= 3; i++ ){
         Request request = new Request();
         request.setId("" + i);
         request.setName("pro" + i);
         request.setRequestMessage("数据信息" + i);
         cf.channel().writeAndFlush(request);
         TimeUnit.SECONDS.sleep(4);
      }

      cf.channel().closeFuture().sync();
      
      //通道关闭后,经过另外一个线程模拟客户端再次创建链接发送请求
      new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               System.out.println("进入子线程...");
               ChannelFuture cf = c.getChannelFuture();
               System.out.println(cf.channel().isActive());
               System.out.println(cf.channel().isOpen());
               
               //再次发送数据
               Request request = new Request();
               request.setId("" + 4);
               request.setName("pro" + 4);
               request.setRequestMessage("数据信息" + 4);
               cf.channel().writeAndFlush(request);               
               cf.channel().closeFuture().sync();
               System.out.println("子线程结束...");

            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
      }).start();
      
      System.out.println("断开链接,主线程结束..");
      
   }
   
   
   
}

这里对Client进行了初步的封装,采用静态内部类实现单例。

Client的Handler不单单有Marshalling的编×××,还加入了Netty自带的ReadTimeoutHandler,这是客户端与服务端一段时间没有通讯就断开链接的依据。从这里也看到Netty的强大之处了,经过提供一些预约义的Handler让你的代码变得简单,只须要专一于业务实现便可。客户端超时断开通道后,如何再次创建链接进行通讯呢?经过getChannelFuture()你会知道。

客户端代码模拟了一个线程通讯超时,关闭通道后,另外一个线程与服务器端再次通讯。



Server Handler

package day3;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter{

   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      Request request = (Request)msg;
      System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
      Response response = new Response();
      response.setId(request.getId());
      response.setName("response" + request.getId());
      response.setResponseMessage("响应内容" + request.getId());
      ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE);
   }

   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      ctx.close();
   }

   
   
}


Server

package day3;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

public class Server {

   public static void main(String[] args) throws Exception{
      
      EventLoopGroup pGroup = new NioEventLoopGroup();
      EventLoopGroup cGroup = new NioEventLoopGroup();
      
      ServerBootstrap b = new ServerBootstrap();
      b.group(pGroup, cGroup)
       .channel(NioServerSocketChannel.class)
       .option(ChannelOption.SO_BACKLOG, 1024)
       //设置日志
       .handler(new LoggingHandler(LogLevel.INFO))
       .childHandler(new ChannelInitializer<SocketChannel>() {
         protected void initChannel(SocketChannel sc) throws Exception {
            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            sc.pipeline().addLast(new ReadTimeoutHandler(3));
            sc.pipeline().addLast(new ServerHandler());
         }
      });
      
      ChannelFuture cf = b.bind(8765).sync();
      
      cf.channel().closeFuture().sync();
      pGroup.shutdownGracefully();
      cGroup.shutdownGracefully();
      
   }
}



运行结果

wKiom1h63b_ROhaOAAB6W0wcmzc404.png

wKiom1h63c3ALRKyAAA7IlAsS98780.png


说明:因为客户端一开始是发送3条消息给服务端,可是每条消息发送间隔4S,因为超时设置为3S,因而发送第一条消息后,通道便断开链接。接下来,客户端又启动了一个线程再次与服务端通讯。



到这里,这篇博客就结束了,对你有用吗?

下周咱们再来看Netty在心跳检测方面的应用,^_^

相关文章
相关标签/搜索