自定义分割符,用:DelimiterBasedFrameDecoder类ide
ByteBuf转String,用StringDecoder类oop
参考代码:线程
//设置链接符/分隔符,换行显示 ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); //DelimiterBasedFrameDecoder:自定义分隔符 sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //设置为字符串形式的解码:将传递的buf改成String sc.pipeline().addLast(new StringDecoder()); //处理数据 sc.pipeline().addLast(new ClientHandler());
完整代码:code
client代码server
public static void main(String[] args) throws InterruptedException { EventLoopGroup worker = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(worker) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // TODO Auto-generated method stub //设置链接符/分隔符,换行显示 ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); //DelimiterBasedFrameDecoder:自定义分隔符 sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //设置为字符串形式的解码:将传递的buf改成String sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); //链接端口 ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.copiedBuffer("aaa$_".getBytes())); cf.channel().writeAndFlush(Unpooled.copiedBuffer("bbbbb$_".getBytes())); cf.channel().writeAndFlush(Unpooled.copiedBuffer("cccccccc$_".getBytes())); cf.channel().closeFuture().sync(); worker.shutdownGracefully(); }
clientHandler代码blog
public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub //super.channelRead(ctx, msg); try { //在传输的时候已经将ByteBuf转为string String str = (String)msg; System.out.println("Client: " + str); } finally { // TODO: handle finally clause ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub cause.printStackTrace(); ctx.close(); } }
Server代码:接口
public static void main(String[] args) throws InterruptedException { //待client链接的线程 EventLoopGroup boss = new NioEventLoopGroup(); //处理事务的线程 EventLoopGroup worker = new NioEventLoopGroup(); //bootstarp辅助类,注册server服务 ServerBootstrap b = new ServerBootstrap(); b.group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // TODO Auto-generated method stub //设置链接符,换行显示 ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); //DelimiterBasedFrameDecoder:自定义分隔符 sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //将buf转string sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); //指定监听接口 ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); boss.shutdownGracefully(); worker.shutdownGracefully(); }
ServerHandler代码事务
public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub //super.channelRead(ctx, msg); //handler设置了buf转String String str = (String)msg; System.out.println("Serer:" + str); String response = "我是响应的数据$_"; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub //super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); } }