Rmessage是采用Reactor3,基于reactor-netty项目构建的实时消息推送api。java
什么是Reactor3?react
Reactor 是一个用于JVM的彻底非阻塞的响应式编程框架,具有高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,好比 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)和 Mono(用于 [0|1]个元素),并彻底遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)。git
使用Reactor好处?github
很是容易构建高吞吐量纯异步的代码,还有就是可以无缝整合spring5[webflux]项目。web
使用Rmessage你须要外部管理群组用户关系,以及离线消息存储,Rmessage不提供持久化,测试能够使用默认Handler内存保留离线消息。
整个项目采用纯异步的编程思想去开发,旨在学习reactive programming。spring
ServerStart .builder() .tcp() .ip("127.0.0.1") .port(1888) .onReadIdle(10000l) //设置读心跳时间 .onWriteIdle(10000l) //设置写心跳时间 .option(ChannelOption.SO_RCVBUF,1023) .interceptor(frame -> frame,frame -> frame)// 拦截全部message .setAfterChannelInit(channel -> {// channel设置 }) .connect() .cast(TcpServerSession.class) .subscribe(session->{ session.addGroupHandler(groupId -> null).subscribe(); session.addOfflineHandler(new DefaultOffMessageHandler()).subscribe(); session.addUserHandler(new DefaultUserTransportHandler()); });
ClientStart .builder() .tcp() .ip("127.0.0.1") .port(1888) .userId("21344") //设置用户名 .password("12312") //设置密码 .onReadIdle(10000l,()->()->System.out.println("心跳了"))//设置读心跳,以及设置回调runner .setClientType(ClientType.Ios)//设置客户端类型 .setAfterChannelInit(channel -> { // channel设置 }) .connect() .cast(TcpClientSession.class) .subscribe(session->{ session.sendPoint("123","测试一下哦").subscribe(); //发送单聊消息 session.sendGroup("group1","123").subscribe(); // 发送群聊消息 session.accept(message -> { }); // 接受全部消息 });
Github地址 https://github.com/1ssqq1lxr/Rmessage编程