在通讯场景中比较常见的模式为客户端发送请求给服务端,服务端再回以响应。还有一种通讯模式为服务端主动Push消息给客户端,这种通讯一般有两种场景。java
场景一
某个客户端发送指令给服务端,触发服务端push消息至其余客户端,例如:IM。 git
场景二
服务端基于某种业务场景主动Push消息至相连的客户端,例如:APP消息推送。 编程
本文以场景一为例演示如何经过smart-socket实现Push消息下发,首先咱们须要定义三个角色:session
通讯编程的首要步骤,则是定义通讯协议。出于演示目的,咱们采用length+data的协议格式,即采用4个字节长度的int值表示消息头,而该int数值的大小表明着消息体的长度。SendClient与PushServer,PushServer与ReceiverClient皆采用此协议通讯。socket
public class StringProtocol implements Protocol<String> { @Override public String decode(ByteBuffer readBuffer, AioSession<String> session) { int remaining = readBuffer.remaining(); if (remaining < Integer.BYTES) { return null; } readBuffer.mark(); int length = readBuffer.getInt(); if (length > readBuffer.remaining()) { readBuffer.reset(); return null; } byte[] b = new byte[length]; readBuffer.get(b); readBuffer.mark(); return new String(b); } }
PushServer的处理器须要具有如下几方面能力:ide
public class PushServerProcessorMessage implements MessageProcessor<String> { private static final Logger LOGGER = LoggerFactory.getLogger(PushServerProcessorMessage.class); private Map<String, AioSession<String>> sessionMap = new ConcurrentHashMap<>(); @Override public void process(AioSession<String> session, String msg) { LOGGER.info("收到SendClient发送的消息:{}", msg); byte[] bytes = msg.getBytes(); sessionMap.values().forEach(onlineSession -> { if (session == onlineSession) { return; } WriteBuffer writeBuffer = onlineSession.writeBuffer(); try { LOGGER.info("发送Push至ReceiverClient:{}", onlineSession.getSessionID()); writeBuffer.writeInt(bytes.length); writeBuffer.write(bytes); writeBuffer.flush(); } catch (Exception e) { LOGGER.error("Push消息异常", e); } }); } @Override public void stateEvent(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) { switch (stateMachineEnum) { case NEW_SESSION: LOGGER.info("与客户端:{} 创建链接", session.getSessionID()); sessionMap.put(session.getSessionID(), session); break; case SESSION_CLOSED: LOGGER.info("断开客户端链接: {}", session.getSessionID()); sessionMap.remove(session.getSessionID()); break; default: } } }
本文简化了消息接收者的处理逻辑,只是打印一行日志用于观察。实际应用中须要根据收到的消息执行一些业务逻辑。ui
public class PushClientProcessorMessage implements MessageProcessor<String> { private static final Logger LOGGER = LoggerFactory.getLogger(PushClientProcessorMessage.class); @Override public void process(AioSession<String> session, String msg) { LOGGER.info("ReceiverClient:{} 收到Push消息:{}", session.getSessionID(), msg); } @Override public void stateEvent(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) { } }
启动服务端:PushServer日志
public class PushServer { public static void main(String[] args) throws IOException { AioQuickServer<String> server = new AioQuickServer<>(8080, new StringProtocol(), new PushServerProcessorMessage()); server.start(); } }
启动接收者:ReceiverClientcode
public class ReceiverClient { public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r); } }); StringProtocol protocol = new StringProtocol(); PushClientProcessorMessage clientProcessorMessage = new PushClientProcessorMessage(); AioQuickClient<String>[] clients = new AioQuickClient[4]; for (int i = 0; i < clients.length; i++) { clients[i] = new AioQuickClient<>("localhost", 8080, protocol, clientProcessorMessage); clients[i].start(channelGroup); } } }
启动发送者:SenderClientserver
public class SenderClient { public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { StringProtocol protocol = new StringProtocol(); PushClientProcessorMessage clientProcessorMessage = new PushClientProcessorMessage(); AioQuickClient<String> clients = new AioQuickClient("localhost", 8080, protocol, clientProcessorMessage); AioSession<String> session = clients.start(); byte[] msg = "HelloWorld".getBytes(); while (true) { WriteBuffer writeBuffer = session.writeBuffer(); writeBuffer.writeInt(msg.length); writeBuffer.write(msg); writeBuffer.flush(); Thread.sleep(1000); } } }
SenderClient每秒中发送一条:“HelloWorld” 消息至 PushServer。观察 PushServer控制台 能够看到服务端接收到消息以后,会便可转发至 ReceiverClient。
而后再去观察 ReceiverClient控制台,则会打印服务端Push过来的消息。
本文经过一个简单的示例,演示了Push服务的实现原理。实际场景下还包括不少可靠性方面的问题须要考虑,感兴趣的读者可自行研究。
本文涉及到的示例代码可从smart-socket仓库中下载