1、报文队列的处理:java
若是将多路复用器获取到的全部事件,阻塞式的同步处理,那恐怕会严重影响selector的性能,因此我把从客户端接收到的大部分消息,都放入了队列中,而后另外启动队列的消费线程对消息进行异步的处理;具体以下:异步
1.通信报文队列消费者:在selector对read事件的处理过程当中,我在最后都把客户端发送的报文放入了一个叫CQUEUE的队列中,具体定义以下,CQUEUE是全部客户端发送报文的队列,在CQUEUE队列中的消费者线程中,我又对M类报文进行了对垒处理,放入了另外一个队列MQUEUE。socket
public class GVQueue { //通信级别报文的队列 public static BlockingQueue CQUEUE = new ArrayBlockingQueue<Packet>(3000); //短消息级别报文的队列 public static BlockingQueue MQUEUE = new ArrayBlockingQueue<Packet>(3000); }
2.CQUEUE队列的消费者线程,专门针对通信层面的消息进行处理,好比:客户端链路维护的回应等;以下:性能
public class CQueueConsumer extends Thread { private int waitTime; private static Logger logger = LogManager.getLogger(CQueueConsumer.class.getName()); public CQueueConsumer(int waitTime) { this.waitTime = waitTime; } public void run() { logger.info("通信队列消费者线程启动……"); boolean isRunning = true; try { while (isRunning) { IPacket packet = (IPacket) GVQueue.CQUEUE.poll(1, TimeUnit.SECONDS); if (packet != null) { handleQueue(packet); /* if (logger.isDebugEnabled()) { logger.debug("读出消息队列收到的客户端消息:" + packet.getPacketStr()); }*/ logger.debug("读出消息队列收到的客户端消息:" + packet.getPacketStr()); } else { Thread.sleep(waitTime); if (logger.isDebugEnabled()) { logger.debug("消息队列中没有消息,休息一下子……"); } } } } catch (InterruptedException e) { logger.info("通信队列消费者处理线程终止……"); e.printStackTrace(); } } /** * 通信层处理(对除了M报文以外的报文进行处理) * @param packet */ private void handleQueue(IPacket packet) { //若是是短消息类报文,则直接放入短消息队列等待短消息消费者处理; if (packet.getHeader().equals(MsgPacket.HEADER)){ GVQueue.MQUEUE.offer(packet); } if (!packet.getHeader().equals(ReplyPacket.HEADER)) { //须要更新通道的最后访问时间 GVConnection gvConn = GVConnTools.getConnByToken(packet.getClientToken()); if (gvConn!=null){ //更改最后访问时间 GVConnTools.updLastAccessTime(packet.getClientToken(),CommonTools.systemTimeUtc()); SocketChannel socketChannel = gvConn.getChannel(); //对客户端的报文作出R相应 if (socketChannel != null) { ReplyOrder replyOrder = (ReplyOrder)Config.getOrderInstance(ReplyOrder.HEADER); replyOrder.initReplyOrder(packet.getRid()); GVServer.write2Client(replyOrder, socketChannel); } } } } }
3.而MQUEUE队列的消费者线程,则专门针对M类报文进行处理,它的工做是拿出M报文,找到目标通道,而后将报文内容转入目标通道(目前离线存储还没有实现)。以下:this
public class MQueueConsumer extends Thread { private static Logger logger = LogManager.getLogger(MQueueConsumer.class.getName()); public void run() { logger.info("短消息队列消费者线程启动……"); while (true) { try { Packet packet = (Packet) GVQueue.MQUEUE.poll(1, TimeUnit.SECONDS); if (packet != null) { // Logs.info("读出消息队列收到的客户端消息:" + packet.getPacketStr()); MsgInfo msgInfo = new MsgInfo(); msgInfo = msgInfo.generaterMsgInfo(packet.getPacketBody()); SocketChannel channel = GVConnTools.getChannelByUserId(msgInfo.getReceiver()); if(channel!=null && channel.isOpen()) { MsgOrder msgOrder = (MsgOrder) Config.getOrderInstance(MsgOrder.HEADER); msgOrder.initMsgOrder(packet.getPacketBody()); GVServer.write2Client(msgOrder, channel); if (logger.isDebugEnabled()) { logger.debug("短消息发送至:<" + msgInfo.getReceiver() + ">"); } }else{ /* 此处将数据放入离线存储队列 */ if(logger.isDebugEnabled()) { logger.debug("短消息放入离线短消息队列:<" + msgInfo.getReceiver() + ">"); } } } else { Thread.sleep(200); if(logger.isDebugEnabled()) { logger.debug("消息队列中没有消息,休息一下子……"); } } } catch (InterruptedException e) { logger.info("短消息队列消费者处理线程终止……"); e.printStackTrace(); } } } }