基于SpringBoot,借助Netty控制长连接,使用WebSocket协议作一个实时的聊天室。java
项目统一登陆路径: http://localhost:8080/chat/netty
用户名随机生成,离线调用异步方法,数据写操做,登陆显示历史聊天消息
项目名:InChat
项目地址:https://github.com/UncleCatMy...
项目介绍:基于Netty4与SpringBoot,聊天室WebSocket(文字图片)加API调用Netty长连接执行发送消息(在线数、用户列表)、Iot物联网-MQTT协议、TCP/IP协议单片机通讯,异步存储聊天数据mysql
public class RandomNameUtil { private static Random ran = new Random(); private final static int delta = 0x9fa5 - 0x4e00 + 1; public static char getName(){ return (char)(0x4e00 + ran.nextInt(delta)); } }
spring: datasource: driver-class-name: com.mysql.jdbc.Driver username: root password: root url: jdbc:mysql://localhost:3306/nettychat?characterEncoding=utf-8&useSSL=false jpa: show-sql: true netty: port: 8090 #监听端口 bossThread: 2 #线程数 workerThread: 2 #线程数 keepalive: true #保持链接 backlog: 100
SET FOREIGN_KEY_CHECKS=0; -- ---------------------------- -- Table structure for user_msg -- ---------------------------- DROP TABLE IF EXISTS `user_msg`; CREATE TABLE `user_msg` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `msg` varchar(255) DEFAULT NULL, `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4; -- ---------------------------- -- Records of user_msg -- ---------------------------- INSERT INTO `user_msg` VALUES ('1', '亪', '今天不开心', '2018-08-14 14:26:02', '2018-08-14 14:26:02'); INSERT INTO `user_msg` VALUES ('2', '祐', '不错呀', '2018-08-14 15:09:40', '2018-08-14 15:09:40'); INSERT INTO `user_msg` VALUES ('3', '搈', '开心 开心', '2018-08-14 15:09:40', '2018-08-14 15:09:40'); INSERT INTO `user_msg` VALUES ('4', '兇', '能够的,后面再作个深刻一点的', '2018-08-14 15:18:35', '2018-08-14 15:18:35'); INSERT INTO `user_msg` VALUES ('5', '倎', '开源这个项目', '2018-08-14 15:18:35', '2018-08-14 15:18:35'); INSERT INTO `user_msg` VALUES ('6', '蝡', '1-someting', '2018-08-14 15:24:28', '2018-08-14 15:24:28'); INSERT INTO `user_msg` VALUES ('7', '弔', '不行呀', '2018-08-14 15:24:29', '2018-08-14 15:24:29'); INSERT INTO `user_msg` VALUES ('8', '習', '能够的', '2018-08-14 15:26:03', '2018-08-14 15:26:03'); INSERT INTO `user_msg` VALUES ('9', '蔫', '开源这个项目', '2018-08-14 15:26:03', '2018-08-14 15:26:03');
@Data @Entity @DynamicUpdate public class UserMsg implements Serializable { private static final long serialVersionUID = 4133316147283239759L; @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; private String name; private String msg; private Date createTime; private Date updateTime; }
public interface UserMsgRepository extends JpaRepository<UserMsg,Integer> { //本次未使用到自定义方法,JPA原生便可 }
我没有去配置虚拟机环境,就本地模拟了git
保存用户名称与连接随机ID
@Component public class LikeRedisTemplate { private Map<Object,Object> RedisMap = new ConcurrentHashMap<>(); public void save(Object id,Object name){ RedisMap.put(id,name); } public void delete(Object id){ RedisMap.remove(id); } public Object get(Object id){ return RedisMap.get(id); } }
聊天内容临时存储
@Component public class LikeSomeCacheTemplate { private Set<UserMsg> SomeCache = new LinkedHashSet<>(); public void save(Object user,Object msg){ UserMsg userMsg = new UserMsg(); userMsg.setName(String.valueOf(user)); userMsg.setMsg(String.valueOf(msg)); SomeCache.add(userMsg); } public Set<UserMsg> cloneCacheMap(){ return SomeCache; } public void clearCacheMap(){ SomeCache.clear(); } }
@Component public class MsgAsyncTesk { @Autowired private LikeSomeCacheTemplate cacheTemplate; @Autowired private UserMsgRepository userMsgRepository; @Async public Future<Boolean> saveChatMsgTask() throws Exception{ // System.out.println("启动异步任务"); Set<UserMsg> set = cacheTemplate.cloneCacheMap(); for (UserMsg item:set){ //保存用户消息 userMsgRepository.save(item); } //清空临时缓存 cacheTemplate.clearCacheMap(); return new AsyncResult<>(true); } }
@Data @Component @ConfigurationProperties(prefix = "netty") public class NettyAccountConfig { private int port; private int bossThread; private int workerThread; private boolean keepalive; private int backlog; }
@Component @Qualifier("textWebSocketFrameHandler") @ChannelHandler.Sharable public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Autowired private LikeRedisTemplate redisTemplate; @Autowired private LikeSomeCacheTemplate cacheTemplate; @Autowired private MsgAsyncTesk msgAsyncTesk; @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { Channel incoming = ctx.channel(); String uName = String.valueOf(redisTemplate.get(incoming.id())); for (Channel channel : channels) { //将当前每一个聊天内容进行存储 System.out.println("存储数据:"+uName+"-"+msg.text()); cacheTemplate.save(uName,msg.text()); if (channel != incoming){ channel.writeAndFlush(new TextWebSocketFrame("[" + uName + "]" + msg.text())); } else { channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() )); } } } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress()); String uName = String.valueOf(RandomNameUtil.getName()); //用来获取一个随机的用户名,能够用其余方式代替 //新用户接入 Channel incoming = ctx.channel(); for (Channel channel : channels) { channel.writeAndFlush(new TextWebSocketFrame("[新用户] - " + uName + " 加入")); } redisTemplate.save(incoming.id(),uName); //存储用户 channels.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); String uName = String.valueOf(redisTemplate.get(incoming.id())); //用户离开 for (Channel channel : channels) { channel.writeAndFlush(new TextWebSocketFrame("[用户] - " + uName + " 离开")); } redisTemplate.delete(incoming.id()); //删除用户 channels.remove(ctx.channel()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("用户:"+redisTemplate.get(incoming.id())+"在线"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("用户:"+redisTemplate.get(incoming.id())+"掉线"); msgAsyncTesk.saveChatMsgTask(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); System.out.println("用户:" + redisTemplate.get(incoming.id()) + "异常"); cause.printStackTrace(); ctx.close(); } }
@Component @Qualifier("somethingChannelInitializer") public class NettyWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> { @Autowired private TextWebSocketFrameHandler textWebSocketFrameHandler; @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(textWebSocketFrameHandler); //这里不能使用new,否则在handler中不能注入依赖 } }
@Component public class NettyConfig { @Autowired private NettyAccountConfig nettyAccountConfig; @Bean(name = "bossGroup", destroyMethod = "shutdownGracefully") public NioEventLoopGroup bossGroup(){ return new NioEventLoopGroup(nettyAccountConfig.getBossThread()); } @Bean(name = "workerGroup", destroyMethod = "shutdownGracefully") public NioEventLoopGroup workerGroup(){ return new NioEventLoopGroup(nettyAccountConfig.getWorkerThread()); } @Bean(name = "tcpSocketAddress") public InetSocketAddress tcpPost(){ return new InetSocketAddress(nettyAccountConfig.getPort()); } @Bean(name = "tcpChannelOptions") public Map<ChannelOption<?>, Object> tcpChannelOptions(){ Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>(); options.put(ChannelOption.SO_KEEPALIVE, nettyAccountConfig.isKeepalive()); options.put(ChannelOption.SO_BACKLOG, nettyAccountConfig.getBacklog()); return options; } @Autowired @Qualifier("somethingChannelInitializer") private NettyWebSocketChannelInitializer nettyWebSocketChannelInitializer; @Bean(name = "serverBootstrap") public ServerBootstrap bootstrap(){ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup(), workerGroup()) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(nettyWebSocketChannelInitializer); Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions(); Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet(); for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) { b.option(option, tcpChannelOptions.get(option)); } return b; } }
@Data @Component public class TCPServer { @Autowired @Qualifier("serverBootstrap") private ServerBootstrap serverBootstrap; @Autowired @Qualifier("tcpSocketAddress") private InetSocketAddress tcpPort; private Channel serverChannel; public void start() throws Exception { serverChannel = serverBootstrap.bind(tcpPort).sync().channel().closeFuture().sync().channel(); } @PreDestroy public void stop() throws Exception { serverChannel.close(); serverChannel.parent().close(); } }
@SpringBootApplication @EnableScheduling //启动异步任务 public class NettychatApplication { public static void main(String[] args) throws Exception{ ConfigurableApplicationContext context = SpringApplication.run(NettychatApplication.class, args); //注入NettyConfig 获取对应Bean NettyConfig nettyConfig = context.getBean(NettyConfig.class); //注入TCPServer 获取对应Bean TCPServer tcpServer = context.getBean(TCPServer.class); //启动websocket的服务 tcpServer.start(); } }
项目名:InChat
项目地址:https://github.com/UncleCatMy...
项目介绍:基于Netty4与SpringBoot,聊天室WebSocket(文字图片)加API调用Netty长连接执行发送消息(在线数、用户列表)、Iot物联网-MQTT协议、TCP/IP协议单片机通讯,异步存储聊天数据github
若是本文对你有所帮助,欢迎关注我的技术公众号web