Spring4.3+Webscket 实现聊天、消息推送详解之具体实现(三)

前面可能废话太多了,下面进入正题,讲解spring具体的实现方式。javascript

先来讲一下几个核心的步骤吧: 1.构建websocket服务端业务处理程序 2.构建websocket服务端自身配置程序 3.注册websocket服务端、配置、拦截过滤器等 3.客户端发送握手请求并创建链接``` 这里输入代码html

4.客户端发送消息给服务端
5.服务端处理客户端发送来的消息
6.服务端发送消息给客户端(指定或者群发)
7.客户端展示。


所需jar包如图:
https://static.oschina.net/uploads/img/201609/12105945_t83F.png 

第一步:构建websocket
spring实现方式中,有一个核心的接口WebSocketHandler,此接口中定义了websokcet服务端核心的几个方法,如用户成功链接服务端方法afterConnectionEstablished、处理客户端发送消息的方法handleTextMessage等等。查看源码发现其有一个重要的实现类:.AbstractWebSocketHandler该类是一个抽象类,其有两个实现类分别为:BinaryWebSocketHandler, TextWebSocketHandler。BinaryWebSocketHandler此类中定义了特殊的方法handleBinaryMessage(WebSocketSession session, BinaryMessage message),看其源码中,BinaryMessage 构造函数里面,能够传入一个字节形的数组,经过集成该类可用次方法用于附件上传等功能。

通常信息传递,可经过继承TextWebSocketHandler类来实现。下面先贴出代码:


websockethandler处理程序。

package cn.com.mt.websocket;前端

import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Map;java

import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler;jquery

import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject;web

import cn.com.mt.model.Common_Constant; import cn.com.mt.model.UserInfo; /**redis

  • 该类为websocket服务端处理程序,你能够把他广义上理解为这就是websocket的服务端处理程序
  • @author 寒冰
  • QQ群号: 67746867

*/ public class ContextWebSocketHandler extends TextWebSocketHandler{spring

/*
 * 用于存放在线用户集合,key用用户的id,真实生产环境中,未避免大数据出现的性能、内存、速度等一系列问题,采起内存数据库如redis存储
 */
private static final Map<String,WebSocketSession> users;
static {
	users = new HashMap<String,WebSocketSession>();
}


/**
 * 当客户端有消息发送过来时,会进入此方法进行处理
 * session:消息来源者
 * message:封装了发送过来的消息信息
 * 说明:再此处说明一下,消息格式最好以下:{to:消息接收者ID/Username惟一标识/服务端标识...,content:消息内容},方便业务操做
 */
@Override
protected void handleTextMessage(WebSocketSession session,
		TextMessage message) throws Exception {
	super.handleTextMessage(session, message);
	//若是想取得原始session中封存的用户身份等信息,可经过:session.getAttributes().get(名称)方法取得 
	UserInfo user = (UserInfo)session.getAttributes().get(Common_Constant.USER_INFO);
	
	//经过message.getPayload()方法获取客户端发送过来的有效信息
	String content = message.getPayload();
	JSONObject obj = JSON.parseObject(content);
	//取得标识令牌
	String token = obj.getString("to");
	if("server".equals(token)){//发送过来的信息为服务器接收的系统信息,则处理相关系统级业务逻辑,此处只作原样信息返回
		TextMessage returnMessage = new TextMessage("{'token':'socket_info','fromUser':'server','content':'信息已经收到,内容为:"+obj.getString("content")+"'}");
		session.sendMessage(returnMessage);
	}else if("all".equals(token)){//发送过来的信息为广播信息,则调用所有在线人员接收信息方法
		TextMessage returnMessage = new TextMessage("{'token':'broadcast','fromUser':'"+user.getUsername()+"','content':'"+obj.getString("content")+"'}");
		sendMessageToUsers(returnMessage);
	}else{//发给我的接收
		TextMessage returnMessage = new TextMessage("{'token':'info','fromUser':'"+user.getUsername()+"','toUser':'"+obj.getString("toUser")+"','content':'"+obj.getString("content")+"'}");
		sendMessageToUser(obj.getString("to"),returnMessage);
	}
	
}

/**
 * 用户成功链接成功后会调用该方法
 * 说明:次方法中,咱们在实际生产环境中可能用到的场景如:
 * 1.用户上线以后,接收自身的离线消息。
 * 2.刷新全局在线用户列表
 */
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
		
		UserInfo user = (UserInfo)session.getAttributes().get(Common_Constant.USER_INFO);
		if(null != user){
			users.put(user.getUsername(), session);//放入在线用户集合中
		}
		//发送给当前登陆用户在线用户列表信息
		sendMessageToUsers(new TextMessage("{'token':'refreshlines','list':"+getlineUsers()+"}"));
	
    }


/**
 * 给某个用户发送消息
 *
 * @param userName
 * @param message
 */
public void sendMessageToUser(String username, TextMessage message) {
	
	WebSocketSession user = users.get(username);
	/*
	 * 断定接收信息的一方是否存在而且在线状态,若是在线就发送
	 * 说明:在这个地方,真实的应用里面,处理逻辑应该是先在当前在线用户列表里面,先取指定的用户是否在线,若是不在线,则再去数据库中查找该用户是否存在,若是存在,则该消息
	 * 将加入离线消息存储逻辑处理,待该用户上线时,服务器推送离线消息给该用户。
	 */
	if(null != user && user.isOpen()){
		 try {
			user.sendMessage(message);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
    
}


/**
 * 给全部在线用户发送消息
 *
 * @param message
 */
public void sendMessageToUsers(TextMessage message) {
	
	for (Map.Entry<String,WebSocketSession> entry : users.entrySet()) {  
		  
		 try {
             if (entry.getValue().isOpen()) {
            	 entry.getValue().sendMessage(message);
             }
         } catch (IOException e) {
             e.printStackTrace();
         }
	  
	}  
    
}

/**
 * 返回在线用户列表,json字符串格式
 * @return
 */
public String getlineUsers(){
	return JSON.toJSONString(users.keySet());
}


@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
    if(session.isOpen()){
        session.close();
    }
   System.out.println("传输处理错误......");
    users.remove(session);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
	System.out.println("websocket connection closed......");
    users.remove(session);
}

@Override
public boolean supportsPartialMessages() {
    return false;
}

}数据库

握手拦截器经过继承HttpSessionHandshakeInterceptor实现,以下代码:

package cn.com.mt.websocket;json

import java.util.Map;

import javax.servlet.http.HttpSession;

import org.springframework.http.server.ServerHttpRequest;   import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.web.socket.WebSocketHandler;   import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

import cn.com.mt.model.Common_Constant; import cn.com.mt.model.UserInfo;   /**  * HandShakeInterceptor是websocket握手拦截器,用于拦截websocket初始化链接的请求   * @author 寒冰  * QQ群号: 67746867  *  */ public class HandshakeInterceptor extends HttpSessionHandshakeInterceptor{  

    /**      * 握手以前调用该方法      */     @Override     public boolean beforeHandshake(ServerHttpRequest request,             ServerHttpResponse response, WebSocketHandler wsHandler,             Map<String, Object> attributes) throws Exception {         System.out.println("握手开始");         if (request instanceof ServletServerHttpRequest) {             ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;             HttpSession session = servletRequest.getServletRequest().getSession(false);             if (session != null) {                 //握手之初,封装一下当前用户的信息,为的是在后面的消息处理逻辑中,取出session中封装的用户信息                 UserInfo user = (UserInfo) session.getAttribute(Common_Constant.USER_INFO);                 if (user==null) {                      attributes.put(Common_Constant.USER_INFO,null);                 }else{                     attributes.put(Common_Constant.USER_INFO,user);                 }                                               }         }         return super.beforeHandshake(request, response, wsHandler, attributes);     }

    @Override     public void afterHandshake(ServerHttpRequest request,             ServerHttpResponse response, WebSocketHandler wsHandler,             Exception ex) {         System.out.println("握手完成");         super.afterHandshake(request, response, wsHandler, ex);     }

   }  

经过xml注册websocket相关信息以下:
spring-websocket.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"     xmlns:websocket="http://www.springframework.org/schema/websocket"     xmlns:context="http://www.springframework.org/schema/context"      xmlns:mvc="http://www.springframework.org/schema/mvc"     xsi:schemaLocation="         http://www.springframework.org/schema/beans         http://www.springframework.org/schema/beans/spring-beans.xsd         http://www.springframework.org/schema/websocket         http://www.springframework.org/schema/websocket/spring-websocket.xsd         http://www.springframework.org/schema/context         http://www.springframework.org/schema/context/spring-context.xsd         http://www.springframework.org/schema/mvc          http://www.springframework.org/schema/mvc/spring-mvc.xsd">              <!-- 声明服务端处理程序 -->     <bean id="contextwebsocketHandler" class="cn.com.mt.websocket.ContextWebSocketHandler"/>            <!-- 注册服务端处理程序路径及相关握手拦截器 -->      websocket:handlers         <websocket:mapping path="/chandler" handler="contextwebsocketHandler"/>        websocket:handshake-interceptors            <bean class="cn.com.mt.websocket.HandshakeInterceptor"></bean>        </websocket:handshake-interceptors>     </websocket:handlers>          <!-- 注册服务端处理程序路径及相关握手拦截器  开启sockjs方式,开启以后,前端必须用sockjs方式发起请求链接-->     websocket:handlers         <websocket:mapping path="/jhandler" handler="contextwebsocketHandler"/>        websocket:handshake-interceptors            <bean class="cn.com.mt.websocket.HandshakeInterceptor"></bean>        </websocket:handshake-interceptors>        websocket:sockjs/     </websocket:handlers>

    <!-- 配置websocket消息的最大缓冲区长度 -->     <bean class="org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean">         <property name="maxTextMessageBufferSize" value="8192"/>         <property name="maxBinaryMessageBufferSize" value="8192"/>     </bean>   

    </beans>

上述的代码中,用到了两个本身写的类,一个是实体类UserInfo,一个是用于定义常量的类,以下:
UserInfo:

package cn.com.mt.model; /**  * 用户信息实体  *@author 寒冰  *QQ群号: 67746867  *  */ public class UserInfo {

    private int id;          private String username;     

    public int getId() {         return id;     }

    public void setId(int id) {         this.id = id;     }

    public String getUsername() {         return username;     }

    public void setUsername(String username) {         this.username = username;     }

          }

Common_Constant:

package cn.com.mt.model; /**  * 系通通一常量定义实体  * @author 寒冰  * QQ群号: 67746867  *  */ public class Common_Constant {

    public static final String USER_INFO = "USER_INFO";//用户身份信息           }

同时为了记录登录用户信息,写了一个action,LoginAction,代码以下:

package cn.com.mt.action;

import javax.servlet.http.HttpServletRequest;

import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping;

import cn.com.mt.model.Common_Constant; import cn.com.mt.model.UserInfo;

/**  * 登录action  *@author 寒冰  *QQ群号: 67746867  *  */ @Controller public class LoginAction {

    /**      * 登录验证      * 说明:此处仅为阐明websocket在线用户业务,不作用户身份信息认证声明。      * 特别说明:如作集群部署,如需解决session同步问题,需作session共享。      * @param userinfo      * @param request      * @return      */     @RequestMapping("/login.action")     public String login(UserInfo userinfo,String index,HttpServletRequest request){         request.getSession().setAttribute(Common_Constant.USER_INFO, userinfo);                  return "0".equals(index)?"index0":"index1";     } }

为了可以websocketSession与httpsession能挂上关联,需额外添加请求拦截器RequestListener,让每一次的请求都带着session信息,以下实现:

package cn.com.mt.listener;

import javax.servlet.ServletRequestEvent; import javax.servlet.ServletRequestListener; import javax.servlet.http.HttpServletRequest; /**  * 用户请求/响应监听器  *   * 此监听器的主要做用是让用户每次向websocket发出的请求,都带上session信息  * @author 寒冰  * QQ群号: 67746867  *  */ public class RequestListener implements ServletRequestListener {

    @Override     public void requestInitialized(ServletRequestEvent request) {         // TODO Auto-generated method stub          ((HttpServletRequest) request.getServletRequest()).getSession();     }          @Override     public void requestDestroyed(ServletRequestEvent arg0) {         // TODO Auto-generated method stub              }

}

最后贴出web.xml 配置文件:

<?xml version="1.0" encoding="UTF-8"?>

<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0">               <display-name>yao_yan</display-name>   <servlet>         <servlet-name>spring</servlet-name>         <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>         <init-param>             <param-name>contextConfigLocation</param-name>             <param-value>classpath*:spring-*.xml</param-value>         </init-param>         <load-on-startup>1</load-on-startup>         <async-supported>true</async-supported>     </servlet>

    <!-- websocket URL标识 -->     <servlet-mapping>         <servlet-name>spring</servlet-name>         <url-pattern>/socket/</url-pattern>     </servlet-mapping>          <!-- action URL标识 -->     <servlet-mapping>         <servlet-name>spring</servlet-name>         <url-pattern>.action</url-pattern>     </servlet-mapping>                    <!-- 注册请求监听器 -->     <listener>          <listener-class>cn.com.mt.listener.RequestListener</listener-class>     </listener> </web-app>

前端登录login.jsp:

<%@ page language="java" contentType="text/html; charset=UTF-8"     pageEncoding="UTF-8"%>

<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">

<html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>Insert title here</title> <script type="text/javascript" src="js/jquery-1.10.2.min.js"></script> <script>  $(function(){            $("#login").click(function(){          var username = $.trim($("#username").val());          if(""==username){              alert("enter username please.....");              return ;          }else{              f1.submit();          }      });  }); </script> </head> <body> <form action="login.action" method="post" name="f1"> username:<input type="text" name="username" id="username"> <input type="radio" name="index" value="0" checked="checked">普通方式 <input type="radio" name="index" value="1">sockjs方式 <input type="button" id="login" value="login"> </form>

</body> </html> ```

普通方式jsp:

<%@ page language="java" contentType="text/html; charset=UTF-8"
    pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Insert title here</title>
<script type="text/javascript" src="js/jquery-1.10.2.min.js"></script>

<script>

    var socket_url = "ws://192.168.1.107:8888/yao_yan/socket/chandler";
    
    var  websocket ;

   $(function(){
       
       //链接websocket服务器
       $("#connect").click(function(){
           
           websocket = new WebSocket(socket_url); 
           websocket.onopen = function (evt) { onOpen(evt) }; 
           websocket.onclose = function (evt) { onClose(evt) }; 
           websocket.onmessage = function (evt) { onMessage(evt) }; 
           websocket.onerror = function (evt) { onError(evt) }; 
           
       });
       
       //注册发送消息按钮事件
       $("#send").click(function(){
           var who = $("input[name='who']:checked").val();//获取消息接收方类别
           var content = $.trim($("#content").val());//获取消息内容
           var msg = "{";
           
           if(""==content){
               alert("输入消息内容!");
               return;
           }
           if("server"==who){
               msg+="'to':'server',";
               $("#msg").after("<p>你对服务器说:"+content+"</p>");
           }else if("all"==who){
               
               msg+="'to':'all',";
               $("#msg").after("<p>你对你们说:"+content+"</p>");
           }else if("one"==who){
               
               var to_user = $.trim($("#userlist").val());
               if(""==to_user){
                  alert("选择接收消息方!");
                  return ;
               }
               msg+="'to':'"+to_user+"',";
               
               $("#msg").after("<p>你对"+to_user+"说:"+content+"</p>");
           }
           msg += "'content':'"+content+"'}";
           
           sendd(msg);
           
       });
       
       
      
   });
   
    //链接成功websocket服务器时执行
       function onOpen(evt) { 
           $("#msg").text("<p>成功链接到websocket服务器</p>");
           $("#send").removeAttr("disabled");//发送信息按钮可用
           $("#connect").attr("disabled","disabled");//链接服务器按钮不可用
    } 
    
    //链接关闭时执行
    function onClose(evt) { 
       console.log("链接关闭。。。。"); 
    } 
    
    //服务器有推送消息过来时执行
    function onMessage(evt) {
        
       var obj = eval('(' + evt.data + ')');
       if(obj.token=="refreshlines"){//断定token为刷新在线用户列表
           refresh_line_users(obj.list);
       }else if(obj.token=="broadcast"){//广播消息
           $("#msg").after("<p>"+obj.fromUser+" 对你们说:"+obj.content+"</p>");
       }else if(obj.token=="info"){//私信
           $("#msg").after("<p>"+obj.fromUser+" 对你说:"+obj.content+"</p>");
       }else if(obj.token=="socket_info"){//系统提醒消息
           $("#msg").after("<p>"+obj.fromUser+" 响应消息:"+obj.content+"</p>");
       }
    } 
    
    //有错误信息时执行
    function onError(evt) { 
       console.log('Error occured: ' + evt.data); 
    }
    
    //向服务器发送消息,此方式实际生产中考虑消息加密传递
    function sendd(msg){
        websocket.send(msg);
        
    }
    
    //刷新在线用户列表
    function refresh_line_users(data){
        
        $("#userlist").empty();
        for(var i=0;i<data.length;i++){
            $("#userlist").append("<option value='"+$.trim(data[i])+"'>"+$.trim(data[i])+"</option>");
        }
        $("#userlist").append("<option value='all'>所有</option>");
        
    }
</script>
</head>
<body>
<h1>普通请求方式</h1>

        响应提示区域:<div id="msg" style="color:red"></div>
        <hr>
        
        
        <br>
        消息内容:
        <input type="text" id="content" name="content">
        <br><br>
         在线列表:
         <select id="userlist">
         </select>
        <br><br>
        我要发消息给:<input type="radio" id="server" name="who" value="server" checked="checked">服务器识别消息
        <input type="radio" id="one" name="who" value="one">我的
        <input type="radio" id="all" name="who" value="all">广播
        <br><br>
        <input type="button" id="connect" value="链接websocket服务器">
        <input type="button" id="send" value="发送" disabled="disabled">
        
</body>
</html>

sockjs方式:

<%@ page language="java" contentType="text/html; charset=UTF-8"
    pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Insert title here</title>
<script type="text/javascript" src="js/jquery-1.10.2.min.js"></script>
<script src="js/sockjs-1.1.1.min.js"></script>  
<script>

    var socket_url = "http://192.168.1.107:8888/yao_yan/socket/jhandler";
    
    var  sock ;

   $(function(){
       
       //链接websocket服务器
       $("#connect").click(function(){
           
           sock = new SockJS(socket_url);
           sock.onopen = function (evt) { onOpen(evt) }; 
           sock.onclose = function (evt) { onClose(evt) }; 
           sock.onmessage = function (evt) { onMessage(evt) }; 
           sock.onerror = function (evt) { onError(evt) }; 
           
       });
       
       //注册发送消息按钮事件
       $("#send").click(function(){
           var who = $("input[name='who']:checked").val();//获取消息接收方类别
           var content = $.trim($("#content").val());//获取消息内容
           var msg = "{";
           
           if(""==content){
               alert("输入消息内容!");
               return;
           }
           if("server"==who){
               msg+="'to':'server',";
               $("#msg").after("<p>你对服务器说:"+content+"</p>");
           }else if("all"==who){
               
               msg+="'to':'all',";
               $("#msg").after("<p>你对你们说:"+content+"</p>");
           }else if("one"==who){
               
               var to_user = $.trim($("#userlist").val());
               if(""==to_user){
                  alert("选择接收消息方!");
                  return ;
               }
               msg+="'to':'"+to_user+"',";
               
               $("#msg").after("<p>你对"+to_user+"说:"+content+"</p>");
           }
           msg += "'content':'"+content+"'}";
           
           sendd(msg);
           
       });
       
       
      
   });
   
    //链接成功websocket服务器时执行
       function onOpen(evt) { 
           $("#msg").after("<font color='red'>系统提示:</font><p>你已经成功链接到websocket服务器!!!如今能够聊天了</p>");
           $("#send").removeAttr("disabled");//发送信息按钮可用
           $("#connect").attr("disabled","disabled");//链接服务器按钮不可用
    } 
    
    //链接关闭时执行
    function onClose(evt) { 
       console.log("链接关闭。。。。"); 
    } 
    
    //服务器有推送消息过来时执行
    function onMessage(evt) {
        
       var obj = eval('(' + evt.data + ')');
       if(obj.token=="refreshlines"){//断定token为刷新在线用户列表
           refresh_line_users(obj.list);
       }else if(obj.token=="broadcast"){//广播消息
           $("#msg").after("<p>"+obj.fromUser+" 对你们说:"+obj.content+"</p>");
       }else if(obj.token=="info"){//私信
           $("#msg").after("<p>"+obj.fromUser+" 对你说:"+obj.content+"</p>");
       }else if(obj.token=="socket_info"){//系统提醒消息
           $("#msg").after("<p>"+obj.fromUser+" 响应消息:"+obj.content+"</p>");
       }
    } 
    
    //有错误信息时执行
    function onError(evt) { 
       console.log('Error occured: ' + evt.data); 
    }
    
    //向服务器发送消息,此方式实际生产中考虑消息加密传递
    function sendd(msg){
        sock.send(msg);
        
    }
    
    //刷新在线用户列表
    function refresh_line_users(data){
        
        $("#userlist").empty();
        for(var i=0;i<data.length;i++){
            $("#userlist").append("<option value='"+$.trim(data[i])+"'>"+$.trim(data[i])+"</option>");
        }
        $("#userlist").append("<option value='all'>所有</option>");
        
    }
</script>
</head>
<body>
<h1>sockjs请求方式</h1>
        响应提示区域:<div id="msg" style="color:red"></div>
        <hr>
        
        
        <br>
        消息内容:
        <input type="text" id="content" name="content">
        <br><br>
         在线列表:
         <select id="userlist">
         </select>
        <br><br>
        我要发消息给:<input type="radio" id="server" name="who" value="server" checked="checked">服务器识别消息
        <input type="radio" id="one" name="who" value="one">我的
        <input type="radio" id="all" name="who" value="all">广播
        <br><br>
        <input type="button" id="connect" value="链接websocket服务器">
        <input type="button" id="send" value="发送" disabled="disabled">
        
</body>
</html>

下面的章节,会详细介绍上述代码。

相关文章
相关标签/搜索