Netty的介绍就不在这里阐述了,前面也写过关于Netty的文章:
Netty(一) springboot整合Netty做心跳检测
Netty(二) springboot 整合netty编写时间服务器
这里不做过多的介绍,代码里有相应的注释,
首先添加依赖:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.21.Final</version> </dependency>
WebsocketServer
@Component public class WebsocketServer { private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketServer.class); //初始化主线程(boss线程) NioEventLoopGroup mainGroup = new NioEventLoopGroup(); //初始化从线程池(work线程) NioEventLoopGroup subGroup = new NioEventLoopGroup(); @PostConstruct public void start() { try { //创建服务启动器 ServerBootstrap serverBootstrap = new ServerBootstrap(); //指定使用主线程和从线程 serverBootstrap.group(mainGroup, subGroup) //指定使用NIO通道类型 .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(9001)) //保持长连接 .childOption(ChannelOption.SO_KEEPALIVE, true) //指定通道初始化器用来加载当Channel收到事件消息后,如何进行业务处理 .childHandler(new WsServerInitializer()); //绑定端口启动服务器,并等待服务启动 ChannelFuture future = serverBootstrap.bind().sync(); if (future.isSuccess()) { LOGGER.info("启动 Netty 成功"); } //等待服务器关闭 //future.channel().closeFuture().sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 销毁 */ @PreDestroy public void destroy() { mainGroup.shutdownGracefully().syncUninterruptibly(); subGroup.shutdownGracefully().syncUninterruptibly(); LOGGER.info("关闭 Netty 成功"); } }
WsServerInitializer(通道初始化)
/** * Created by haoxy on 2019/1/4. * E-mail:[email protected] * github:https://github.com/haoxiaoyong1014 * 通道初始化器 * 用来加载通道处理器 */ public class WsServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //用于支持 http协议 //websocket 基于http协议,需要 http的编码器 pipeline.addLast(new HttpServerCodec()) //对大数据流的支持 .addLast(new ChunkedWriteHandler()) //添加对HTTP请求和响应的聚合器: 只要Netty进行编码都需要使用 //对HttpMessage进行聚合,聚合成FullHttpRequest获取FullHttpResponse .addLast(new HttpObjectAggregator(1024 * 64)) //-----------支持 webSocket---------- //需要指定接收请求的路由,处理握手动作(close,ping pong),ping+pong=心跳 //必须使用 ws 后缀结尾的 url 才能访问 .addLast(new WebSocketServerProtocolHandler("/ws")) //添加自定义的 handler .addLast(new ChatHandler()); } }
ChatHandler(自定义业务处理类)
/** * Created by haoxy on 2019/1/4. * E-mail:[email protected] * github:https://github.com/haoxiaoyong1014 */ //TextWebSocketFrame 在 netty 中是用于webSocket专门处理文本对象,frame是消息的载体. public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private final static Logger LOGGER = LoggerFactory.getLogger(ChatHandler.class); //用于记录和管理所有客户端的Channel private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd hh:MM"); //当接收到客户端发过来的消息就会触发此回调 @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { String content = textWebSocketFrame.text(); System.out.println("接收到的数据: " + content); //将消息发给其他客户端排除自己 Channel channel = channelHandlerContext.channel(); for (Channel ch : channels) { //排除当前通道 if (channel != ch) { ch.writeAndFlush(new TextWebSocketFrame(sdf.format(new Date()) + ":" + content)); } } /*//将接收的消息发送所有的客户端 for (Channel channel : channels) { channel.writeAndFlush(new TextWebSocketFrame(sdf.format(new Date()) + ":" + content)); }*/ } //当有新的客户端连接服务器之后,就会自动调用这个方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { channels.add(ctx.channel()); } }
chat.html(前端代码)
<body> <input type="text" id="message"> <input type="button" value="发送消息" onclick="sendMsg()"> <br/> 接收到消息: <p id="server_message" style="background-color: #AAAAAA"></p> <script> var websocket = null; //判断当前浏览器是否支持 webSocket if (window.WebSocket) { websocket = new WebSocket("ws://127.0.0.1:9001/ws"); websocket.onopen = function (ev) { console.log("建立连接"); } websocket.onclose = function (ev) { console.log("断开连接"); } websocket.onmessage = function (ev) { console.log("接收到服务器的消息" + ev.data); var server_message = document.getElementById("server_message"); server_message.innerHTML += ev.data + "<br/>"; } } else { alert("当前浏览器不支持 webSocket") } function sendMsg() { var message = document.getElementById("message"); websocket.send(message.value) } </script> </body>
案例效果图:
窗口A给窗口B发送消息:
窗口B回复窗口A的消息:
案例项目地址: