netty的pipeline处理链上的handler:须要IdleStateHandler心跳检测channel是否有效,以及处理登陆认证的UserAuthHandler和消息处理MessageHandlerjava
protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(defLoopGroup, //编码解码器 new HttpServerCodec(), //将多个消息转换成单一的消息对象 new HttpObjectAggregator(65536), //支持异步发送大的码流,通常用于发送文件流 new ChunkedWriteHandler(), //检测链路是否读空闲,配合心跳handler检测channel是否正常 new IdleStateHandler(60, 0, 0), //处理握手和认证 new UserAuthHandler(), //处理消息的发送 new MessageHandler() ); }
对于全部连进来的channel,咱们须要保存起来,日后的群发消息须要依靠这些channelgit
public static void addChannel(Channel channel) { String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel); System.out.println("addChannel:" + remoteAddr); if (!channel.isActive()) { logger.error("channel is not active, address: {}", remoteAddr); } UserInfo userInfo = new UserInfo(); userInfo.setAddr(remoteAddr); userInfo.setChannel(channel); userInfo.setTime(System.currentTimeMillis()); userInfos.put(channel, userInfo); }
登陆后,channel就变成有效的channel,无效的channel以后将会丢弃github
public static boolean saveUser(Channel channel, String nick, String password) { UserInfo userInfo = userInfos.get(channel); if (userInfo == null) { return false; } if (!channel.isActive()) { logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick); return false; } // 验证用户名和密码 if (nick == null || password == null) { return false; } LambdaQueryWrapper<Account> lambdaQueryWrapper = new LambdaQueryWrapper<>(); lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password); Account account = accountMapperStatic.selectOne(lambdaQueryWrapper); if (account == null) { return false; } // 增长一个认证用户 userCount.incrementAndGet(); userInfo.setNick(nick); userInfo.setAuth(true); userInfo.setId(account.getId()); userInfo.setUsername(account.getUsername()); userInfo.setGroupNumber(account.getGroupNumber()); userInfo.setTime(System.currentTimeMillis()); // 注册该用户推送消息的通道 offlineInfoTransmitStatic.registerPull(channel); return true; }
当channel关闭时,就再也不接收消息。unregisterPull就是注销信息消费者,客户端再也不接取聊天消息。此外,从下方有一个加写锁的操做,就是为了不channel还在发送消息时,这边忽然关闭channel,这样会致使报错。app
public static void removeChannel(Channel channel) { try { logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel)); //加上读写锁保证移除channel时,避免channel关闭时,还有别的线程对其操做,形成错误 rwLock.writeLock().lock(); channel.close(); UserInfo userInfo = userInfos.get(channel); if (userInfo != null) { if (userInfo.isAuth()) { offlineInfoTransmitStatic.unregisterPull(channel); // 减去一个认证用户 userCount.decrementAndGet(); } userInfos.remove(channel); } } finally { rwLock.writeLock().unlock(); } }
为了无缝切换使用rabbitmq、rocketmq、activemq、不使用中间件存储和转发聊天消息这4种状态,定义以下4个接口。依次是发送单聊消息、群聊消息、客户端启动接收消息、客户端下线不接收消息。异步
public interface OfflineInfoTransmit { void pushP2P(Integer userId, String message); void pushGroup(String groupNumber, String message); void registerPull(Channel channel); void unregisterPull(Channel channel); }
其中,如何使用rabbitmq、rocketmq、activemq三种中间件中的一种来存储和转发聊天消息,它的处理流程以下:oop