springboot使用netty-socketio推送消息

前言

背景

最近被分配了一个站内信模块,由本身单独负责这个模块;这个模块主要功能就是提供一个接口给调用方,而后将传送的消息推送至登陆的相关的用户的客户端;而后就是用户对这条消息的操做了,就是写一些curd的接口供前端调用;html

技术选用

因为以前用netty作过一个项目,并且一位大佬也写了不少关于netty的文章,第一时间就想到去看他写的设计一个百万级的消息推送系统; 而后仔细对比了一下,我负责这个模块:前端

  • 用户量不大,由于针对的是运维人员,并且不是全部运维人,是有针对性的;
  • 不用安全验证,由于这个项目是在内网中运行;
  • 这个模块不用分布式,只是一个微服务中的一部分;

最后选用了netty-socketio这个框架;并且网上的文章也很多;java

正文

springboot整合netty-socketio

pom

首先导入包,我导入的版本是1.7.11;我最开始导入的是跟前边的一个版本,可是出现了一个问题,就是OnEvent事件没法监听,因此我换了更高的版本,而后就能够了;git

<dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>1.7.11</version>
        </dependency>

复制代码

整合

这个整合和以前的springboot整合netty同样的;github

  1. 实现CommandLineRunner 接口,在springboot启动前启动服务
  2. 使用@Configuration注解,将服务以bean的方式启动
  3. 实现springboot中各类接口,如:实现InitializingBean接口,*Aware接口类等;

一句话总结起来就是netty的启动是以注入的方式启动,而不是以new的方式;固然也能够以new的方式启动,只是这样的话就没法直接以注入的方式调用其余类了;web

新建ChatServer类

该类主要是启动服务; 我这里使用了实现InitializingBean接口启动服务,类上注意声明@Component,固然也能够用其余的方式;只要springboot在启动时能扫描到这个类就好了;spring

@Component
public class PushServer implements InitializingBean {
    @Resource
    private EventListenner eventListenner;

    @Value("${push.server.port}")
    private int serverPort;
    @Override
    public void afterPropertiesSet() throws Exception {
        Configuration config = new Configuration();
        config.setPort(serverPort);

        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setReuseAddress(true);
        socketConfig.setTcpNoDelay(true);
        socketConfig.setSoLinger(0);
        config.setSocketConfig(socketConfig);
        config.setHostname("localhost");

        SocketIOServer server = new SocketIOServer(config);
        server.addListeners(eventListenner);
        server.start();
        System.out.println("启动正常");
    }
}


复制代码

新建EventListennerlei

该类主要是监听客户端的链接及断开,而后进行处理; 在这里,我对请求地址附带了用户的ID;后端

@Component
public class EventListenner {
    @Resource
    private ClientCache clientCache;
    /** * 客户端链接 * @param client */
    @OnConnect
    public void onConnect(SocketIOClient client) {
        String userId = client.getHandshakeData().getSingleUrlParam("userId");
        UUID sessionId = client.getSessionId();
        clientCache.saveClient(userId,sessionId,client);
        System.out.println("创建链接");
    }
    /** * 客户端断开 * @param client */
    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        String userId = client.getHandshakeData().getSingleUrlParam("userId");
        clientCache.deleteSessionClient(userId,client.getSessionId());
        System.out.println("关闭链接");
    }
    //消息接收入口,当接收到消息后,查找发送目标客户端,而且向该客户端发送消息,且给本身发送消息
    // 暂未使用
    @OnEvent("messageevent")
    public void onEvent(SocketIOClient client, AckRequest request) {
    }
}

复制代码

缓存

因为须要向指定用户推送消息,因此须要将链接信息与用户绑定,因此前端在登录的同时须要发送用户ID;而后在链接事件的监听中将链接信息与用户绑定;链接信息与用户实现一对一对应,可是这样会出现一个问题,当用户打开多个页面时,新开的页面通道链接会将旧页面的通道链接信息覆盖,形成没法所有页面推送,因此将用户与通道信息改为一对多的关系; 如何将用户与通道信息保存,我这里使用了两个map集合:缓存

  • 里层map存储页面sessionID对应的通道信息,以sessionID为key,通道链接信息为value
  • 外层map存储用户ID对应的里层数据,以用户ID为key,里层数据为value;

客户端断开时则须要用户ID及sessid; 代码以下:安全

@Component
public class ClientCache {

    //本地缓存
    private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>();
    /** * 存入本地缓存 * @param userId 用户ID * @param sessionId 页面sessionID * @param socketIOClient 页面对应的通道链接信息 */
    public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){
        HashMap<UUID, SocketIOClient> sessionIdClientCache=concurrentHashMap.get(userId);
        if(sessionIdClientCache==null){
            sessionIdClientCache = new HashMap<>();
        }
        sessionIdClientCache.put(sessionId,socketIOClient);
        concurrentHashMap.put(userId,sessionIdClientCache);
    }
    /** * 根据用户ID获取全部通道信息 * @param userId * @return */
    public HashMap<UUID, SocketIOClient> getUserClient(String userId){
        return concurrentHashMap.get(userId);
    }
    /** * 根据用户ID及页面sessionID删除页面连接信息 * @param userId * @param sessionId */
    public void deleteSessionClient(String userId,UUID sessionId){
        concurrentHashMap.get(userId).remove(sessionId);
    }
}

复制代码

推送接口PushController类

该类主要是提供给别人调用,向用户推送消息,这里就贴直接推送的代码了,具体的推送业务就不贴出来了;这里须要注意,推送事件的命名须要与web端监听命名一致;

@RestController
@RequestMapping("/push")
public class PushController {
    @Resource
    private ClientCache clientCache;

    @GetMapping("/user/{userId}")
    public String pushTuUser(@PathVariable("userId") String userId){
        HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(userId);
        userClient.forEach((uuid, socketIOClient) -> {
            //向客户端推送消息
            socketIOClient.sendEvent("chatevent","服务端推送消息");
        });
        return "success";
    }
}


复制代码

客户端(web端)

因为web端代码及依赖较多;就不提供代码;能够去官网下载:github.com/mrniko/nett… 下载下来后,打开client文件下的index.html;我修改的地方:

注意事件的监听中的名称必定要与后端相对应,否则没法监听;

运行结果

  1. 客户端 这里同时打开三个页面

  1. 服务端

  1. 推送消息

调用推送接口,三个页面同时收到推送消息

GitHub地址

总结

  1. 在刚开始设计时,我设计用户与通道的关系就是一对一的关系,最后在提交流程图的时候,项目组的各位大佬都给我提了许多建议,果真姜仍是老的辣啊,在这里十分感谢项目组的各位大佬提出的宝贵的建议;
  2. 这是新年的第一篇博客;也是新的一个起点,但愿本身能坚持把博客写下去;
相关文章
相关标签/搜索