Websocket实现群聊功能

websocket是什么

WebSocket是web客户端和服务器之间新的通信方式, 依然架构在HTTP协议之上。使用WebSocket链接, web应用程序能够执行实时的交互, 而不是之前的poll方式。html

WebSocket是HTML5开始提供的一种在单个 TCP 链接上进行全双工通信的协议,能够用来建立快速的更大规模的健壮的高性能实时的web应用程序。WebSocket通讯协议于2011年被IETF定为标准RFC 6455,WebSocketAPI被W3C定为标准。
在WebSocket API中,浏览器和服务器只须要作一个握手的动做,而后,浏览器和服务器之间就造成了一条快速通道。二者之间就直接能够数据互相传送。前端

—— Java WebSocket教程java

实现websocket有哪些方式

springboot集成websocket

首先要在maven中引入相关的依赖github

<dependency>  
 <groupId>org.springframework.boot</groupId>  
 <artifactId>spring-boot-starter-websocket</artifactId>  
</dependency>

WebSockerServerEndpoint核心的方法

咱们使用注解方式Annotation-driven编写websocket客户端代码。经过在POJO加上注解, 开发者就能够处理WebSocket生命周期事件。web

首先在类上添加@ServerEndpoint注解:spring

  • value 定义访问 websocket 的路径
  • decoders 和 encoders 用于定义编解码类(下一小节会讲)

因为加了@ServerEndpoint,咱们须要实现与websocket生命周期相关的4个方法json

  • @OnOpen 表示刚创建链接时的操做,好比咱们要实现群聊功能,就要在此时将同一个群中的session都存起来
  • @OnMessage 表示创建链接以后接收到消息以后进行的操做
  • @OnClose 链接关闭时的操做,一样是上面的群聊,在关闭链接的时候就要将相应的session移除
  • @OnError 表示链接出现异常时的操做
@ServerEndpoint(  
        value \= "/chat-room/{conferenceId}/{userId}",  
  decoders \= { MessageDecoder.class },  
  encoders \= { ResponseMessageEncode.class })  
public class WebSockerServerEndpoint {  

  @OnOpen  
  public void openSession(@PathParam("conferenceId") String conferenceId,  @PathParam("userId") String userId,  
  Session session) {  
  
  }  
  
    @OnMessage  
  public void onMessage(@PathParam("conferenceId") String conferenceId,  @PathParam("userId") String userId,  
  Message message) {  
  
    }  
  
    @OnClose  
  public void onClose(@PathParam("userId") String userId, @PathParam("conferenceId") String conferenceId) {  
        
  }  
  
    @OnError  
  public void onError(Throwable t){  
        
  }  
  
}

编解码器

websocket默认接受String类型的数据,而咱们须要实现更加复杂的需求。在聊天的过程当中,咱们但愿可以发送图片、文件、消息,还但愿能有心跳保持链接,消息可以撤回等。这些仅仅经过String类型的数据是没法知足咱们的需求的,因此要用更加复杂的类来实现,这就涉及到了encoder/decoder标准通讯过程。通讯的格式是能够咱们本身定义的,能够使用xml或者json等,下面演示json格式的编解码。segmentfault

decoders 解码

定义Message类和相应的解码类,MessageDecode:后端

public class Message {  
    private String id;  
  
 private String conferenceId;  
  
 private String type;  
  
 private String content;  
  
 private String fileId;
 ...
import com.fasterxml.jackson.databind.ObjectMapper;  
  
import javax.websocket.DecodeException;  
import javax.websocket.Decoder;  
import javax.websocket.EndpointConfig;  
import java.io.IOException;  
  
public class MessageDecoder implements Decoder.Text<Message> {  
    @Override  
  public Message decode(String jsonMessage) throws DecodeException {  
        ObjectMapper mapper = new ObjectMapper();  
  Message message = null;  
 try {  
            message = mapper.readValue(jsonMessage, Message.class);  
  } catch (IOException e) {  
            e.printStackTrace();  
  }  
        return message;  
  }  
  
    @Override  
  public boolean willDecode(String jsonMessage) {  
        ObjectMapper mapper = new ObjectMapper();  
 try {  
            mapper.readValue(jsonMessage, Message.class);  
 return true;  } catch (IOException e) {  
            return false;  
  }  
    }  
  
    @Override  
  public void init(EndpointConfig endpointConfig) {  
  
    }  
  
    @Override  
  public void destroy() {  
  
    }  
}

encoders 编码

将处理好的消息再次编码,定义要给 ResponseMessage 和 ResponseMessageEncode 两个类

public class ResponseMessage {  
  
    private String id;  
  
 private Account fromUser;  
  
 private String type;  
  
 private String content;  
  
 private ConferenceFile file;  
  
  @JsonFormat(pattern \= "yyyy-MM-dd HH:mm:ss")  
    private Date createTime;
import com.fasterxml.jackson.core.JsonProcessingException;  
import com.fasterxml.jackson.databind.ObjectMapper;  
  
import javax.websocket.EncodeException;  
import javax.websocket.Encoder;  
import javax.websocket.EndpointConfig;  
   
public class ResponseMessageEncode implements Encoder.Text<ResponseMessage> {  
    @Override  
  public String encode(ResponseMessage responseMessage) throws EncodeException {  
        ObjectMapper mapper = new ObjectMapper();  
  String json = null;  
 try {  
            json = mapper.writeValueAsString(responseMessage);  
  } catch (JsonProcessingException e) {  
            e.printStackTrace();  
  }  
  
        return json;  
  }  
  
    @Override  
  public void init(EndpointConfig endpointConfig) {  
  
    }  
  
    @Override  
  public void destroy() {  
  
    }  
}

功能实现

点对点发送消息

private void sendText(ResponseMessage responseMessage, Session session) {  
  
    RemoteEndpoint.Basic basic = session.getBasicRemote();  
  
 try {  
        basic.sendObject(responseMessage);  
  } catch (IOException e) {  
        e.printStackTrace();  
  } catch (EncodeException e) {  
        e.printStackTrace();  
  }  
}

群发消息

private void sendTextAll(ResponseMessage responseMessage, String conferenceId) {  
    if(livingSessions.containsKey(conferenceId)){  
        HashMap<String, Session> sessionMap = livingSessions.get(conferenceId);  
  sessionMap.forEach((sessionId, session) -> {  
            sendText(responseMessage, session);  
  });  
  }  
}

心跳检测

@OnMessage  
public void onMessage(@PathParam("conferenceId") String conferenceId,  
  @PathParam("userId") String userId,  
  Message message) {  
  
    try{  
        ResponseMessage responseMessage = new ResponseMessage();  
  
 if(MessageTypeEnum.HEART.equals(message.getType()) && "ping".equals(message.getContent())){  
            this.heartMessage();  
 return;  }
private void heartMessage() {  
    ResponseMessage responseMessage = new ResponseMessage();  
  responseMessage.setType(MessageTypeEnum.HEART.name());  
  responseMessage.setContent("pong");  
  sendText(responseMessage, this.session);  
}

消息撤回

消息撤回要经过广播被撤回的 message_id 来实现,由前端向后端传递一个被撤回的 message_id,后端删除消息并进行广播。前端接收到撤回消息的广播以后,将相应 message_id 的消息删除便可。

题外话-静态注入

spring的bean都是单例(singleton)的,websocket是多实例单线程的。websocket中的对象在@Autowried时,会在应用启动时注入一次,以后建立的websocket对象都不会注入service,因此websocket中注入的的bean会是null。

能够用下面这样静态注入的方式,在应用启动的时候注入bean,因为是静态变量,能够供全部websocket对象使用。

private static IMessageService messageService;

@Autowired  
public void setMessageService(MessageServiceImpl messageService){  
    WebSockerServerEndpoint.messageService \= messageService;  
}
相关文章
相关标签/搜索