WebSocket和kafka实现数据实时推送到前端

一. 需求背景
     最近新接触一个需求,须要将kafka中的数据实时推送到前端展现。最开始想到的是前端轮询接口数据,可是没法保证轮询的频率和消费的频率彻底一致,或形成数据缺失等问题。最终肯定用利用WebSocket实现数据的实时推送。
 
二. websocket简介
     网上已经有好多介绍WebSocket的文章了,就不详细介绍了,这里只作简单介绍。 WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通讯——容许服务器主动发送信息给客户端。
 
三. 服务端实现
  1. pom文件
  这里须要引用三个依赖。第一个为WebSocket须要的依赖,另外两个为kafka的依赖
 

<dependencies>
<!-- webSocket所需依赖 -->
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
</dependency>
<!-- kafka 所需依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>

javascript

2. webSocket服务端实现html

1 //此处定义接口的uri
 2 @ServerEndpoint("/wbSocket")
 3 public class WebSocket {
 4     private Session session;
 5     public static CopyOnWriteArraySet<WebSocket> wbSockets = new CopyOnWriteArraySet<WebSocket>(); //此处定义静态变量,以在其余方法中获取到全部链接
 6     
 7     /**
 8      * 创建链接。
 9      * 创建链接时入参为session
10      */
11     @OnOpen
12     public void onOpen(Session session){
13         this.session = session;
14         wbSockets.add(this); //将此对象存入集合中以在以后广播用,若是要实现一对一订阅,则类型对应为Map。因为这里广播就能够了随意用Set
15         System.out.println("New session insert,sessionId is "+ session.getId());
16     }
17     /**
18      * 关闭链接
19      */
20     @OnClose
21     public void onClose(){
22         wbSockets.remove(this);//将socket对象从集合中移除,以便广播时不发送次链接。若是不移除会报错(须要测试)
23         System.out.println("A session insert,sessionId is "+ session.getId());
24     }
25     /**
26      * 接收前端传过来的数据。
27      * 虽然在实现推送逻辑中并不须要接收前端数据,可是做为一个webSocket的教程或叫备忘,仍是将接收数据的逻辑加上了。
28      */
29     @OnMessage
30     public void onMessage(String message ,Session session){
31         System.out.println(message + "from " + session.getId());
32     }
33 
34     public void sendMessage(String message) throws IOException {
35         this.session.getBasicRemote().sendText(message);
36     }
37 }

 

3. kafka消费者实现前端

复制代码
1 public class ConsumerKafka extends Thread {
 2 
 3     private KafkaConsumer<String,String> consumer;
 4     private String topic = "kafkaTopic";
 5 
 6     public ConsumerKafka(){
 7 
 8     }
 9 
10     @Override
11     public void run(){
12         //加载kafka消费者参数
13         Properties props = new Properties();
14         props.put("bootstrap.servers", "localhost:9092");
15         props.put("group.id", "ytna");
16         props.put("enable.auto.commit", "true");
17         props.put("auto.commit.interval.ms", "1000");
18         props.put("session.timeout.ms", "15000");
19         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
20         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
21         //建立消费者对象
22         consumer = new KafkaConsumer<String,String>(props);
23         consumer.subscribe(Arrays.asList(this.topic));
24         //死循环,持续消费kafka
25         while (true){
26             try {
27                //消费数据,并设置超时时间
28                 ConsumerRecords<String, String> records = consumer.poll(100);
29                 //Consumer message
30                 for (ConsumerRecord<String, String> record : records) {
31                     //Send message to every client
32                     for (WebSocket webSocket :wbSockets){
33                         webSocket.sendMessage(record.value());
34                     }
35                 }
36             }catch (IOException e){
37                 System.out.println(e.getMessage());
38                 continue;
39             }
40         }
41     }
42 
43     public void close() {
44         try {
45             consumer.close();
46         } catch (Exception e) {
47             System.out.println(e.getMessage());
48         }
49     }
50 
51     //供测试用,若经过tomcat启动需经过其余方法启动线程
52     public static void main(String[] args){
53         ConsumerKafka consumerKafka = new ConsumerKafka();
54         consumerKafka.start();
55     }
56 }
复制代码
 
P.S. 须要注意的是WebSocket对tomcat版本是有要求的,笔者使用的是7.0.7.8。
 
四. 前端简单实现
复制代码
1 <!DOCTYPE html>
 2 <html lang="en">
 3 <head>
 4     <meta charset="UTF-8">
 5     <title>WebSocket client</title>
 6     <script type="text/javascript">
 7         var socket;
 8         if (typeof (WebSocket) == "undefined"){
 9             alert("This explorer don't support WebSocket")
10         }
11 
12         function connect() {
13             //Connect WebSocket server
14             socket =new WebSocket("ws://127.0.0.1:8080/wbSocket");
15             //open
16             socket.onopen = function () {
17                 alert("WebSocket is open");
18             }
19             //Get message
20             socket.onmessage = function (msg) {
21                 alert("Message is " + msg);
22             }
23             //close
24             socket.onclose = function () {
25                 alert("WebSocket is closed");
26             }
27             //error
28             socket.onerror = function (e) {
29                 alert("Error is " + e);
30             }
31         }
32 
33         function close() {
34             socket.close();
35         }
36 
37         function sendMsg() {
38             socket.send("This is a client message ");
39         }
40     </script>
41 </head>
42 <body>
43     <button onclick="connect()">connect</button>
44     <button onclick="close()">close</button>
45     <button onclick="sendMsg()">sendMsg</button>
46 </body>
47 </html>
复制代码
 
五. 结语
     以上基本能够实现将kafka数据实时推送到前端。这是笔者第一篇笔记,不足之处请指出、谅解。
     源码:https://github.com/youtNa/webSocketkafka
   引用:1.  webSocket百度百科
相关文章
相关标签/搜索