java-websocket客户端 断线重连 注入Service问题

java版客户端:java

使用开源项目java-websocket, github地址: https://github.com/TooTallNate/Java-WebSocketgit

github上有不少示例,具体能够去查看github

此处主要是记录java-websocket实现客户端,并解决没法使用Service层方法(service为null)的问题,以及断线重连web

引用包spring

<dependency>
    <groupId>org.java-websocket</groupId>
    <artifactId>Java-WebSocket</artifactId>
    <version>1.3.9</version>
</dependency>

初版,使用getBean获取Service层方法,而且实现断线重连

使用的是GitHub上的demo示例apache

import com.alibaba.fastjson.JSONArray;import com.sensor.vibration.utils.ApplicationContextRegister; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.java_websocket.client.WebSocketClient; import org.java_websocket.drafts.Draft; import org.java_websocket.handshake.ServerHandshake; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import java.net.URI; import java.util.Map; /** This example demonstrates how to create a websocket connection to a server. Only the most important callbacks are overloaded. */ @Slf4j public class SensorWebSocketClient extends WebSocketClient { @Autowired private UserService userService; public SensorWebSocketClient(URI serverUri , Draft draft ) { super( serverUri, draft ); } public SensorWebSocketClient(URI serverURI ) { super( serverURI ); } public SensorWebSocketClient(URI serverUri, Map<String, String> httpHeaders ) { super(serverUri, httpHeaders); } @Override public void onOpen( ServerHandshake handshakedata ) { System.out.println( "opened connection" ); // if you plan to refuse connection based on ip or httpfields overload: onWebsocketHandshakeReceivedAsClient
 } @Override public void onMessage( String msg ) { log.info("[websocket] 收到消息={}",msg); } @Override public void onClose( int code, String reason, boolean remote ) { // The codecodes are documented in class org.java_websocket.framing.CloseFrame
        System.out.println( "Connection closed by " + ( remote ? "remote peer" : "us" ) + " Code: " + code + " Reason: " + reason ); } @Override public void onError( Exception ex ) { ex.printStackTrace(); // if the error is fatal then onClose will be called additionally
 } }

新建一个类,建立一个方法,启动websocketjson

import java.net.URI; import java.net.URISyntaxException; /** * Simple example to reconnect blocking and non-blocking. */

public class ReconnectClient { public static void reconnect() throws URISyntaxException, InterruptedException{ SensorWebSocketClient c = new SensorWebSocketClient( new URI( "ws://localhost:5005/websocket" ) ); c.connectBlocking();

        new Thread(new Runnable() { public void run() { System.out.println("Runnable running.."); } }) { public void run() { while (true){ try{ Thread.sleep(3000); c.send(""); }catch (Exception e){ c.reconnect(); } } }; }.start(); } }

在新建一个类,程序启动的时候,调用上面的方法安全

import com.sensor.vibration.utils.Common; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.net.URISyntaxException; import java.util.Map; @Slf4j @Component public class InitStart implements CommandLineRunner { @Override public void run(String... args) throws URISyntaxException, InterruptedException{ ReconnectClient.reconnect(); } }

中间的启动类的方法能够省去,直接写在InitStart的run方法里面websocket

如今还不能使用Service层的方法,会报service为null异常,百度后,参考别人使用getBean方法,写一个工具类java-web

import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component;  @Component @Lazy(false) public class ApplicationContextRegister  implements ApplicationContextAware { private static ApplicationContext APPLICATION_CONTEXT; /** * 设置spring上下文 * * @param applicationContext spring上下文 * @throws BeansException */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { APPLICATION_CONTEXT = applicationContext; } public static ApplicationContext getApplicationContext() { return APPLICATION_CONTEXT; } }

在 SensorWebSocketClient.java 中使用Service

@Autowired private UserService userService; ApplicationContext act = ApplicationContextRegister.getApplicationContext(); userService=act.getBean(UserService.class);

可是 领导不让用getBean这种方法,放弃

第二版,使用Service层方法版本 + 断线重连

实现:

import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.net.URI; /** * Created by Chow on 2019/8/22 */ @Slf4j @Component public class WebSocketClientStart { @Autowired private UserService userService; @Bean public WebSocketClient webSocketClient() { try { WebSocketClient webSocketClient = new WebSocketClient(new URI("ws://127.0.0.1:5005/websocket")) { @Override public void onOpen(ServerHandshake handshakedata) { log.info("[websocket] 链接成功"); } @Override public void onMessage(String msg) { try{ log.info("[websocket] 收到消息={}",msg); if (msg == null || StringUtils.isBlank(msg)){ log.error("the msg message of websocket received is null"); this.send(""); return; } JSONArray jsonArray = JSONArray.parseArray(msg); if (jsonArray == null || jsonArray.size() == 0){ log.info("log: the message of websocket received is empty"); } vibrationAlarmService.alarmAnalysis(jsonArray); this.send(""); }catch (Exception e){ log.error(e.getMessage(), e); this.send(""); } } @Override public void onClose(int code, String reason, boolean remote) { log.info("[websocket] 退出链接"); } @Override public void onError(Exception ex) { log.info("[websocket] 链接错误={}",ex.getMessage()); } }; webSocketClient.connect(); new Thread(new Runnable() { public void run() { System.out.println("Runnable running.."); } }) { public void run() { while (true){ try{ Thread.sleep(3000); webSocketClient.send(""); }catch (Exception e){ webSocketClient.reconnect(); } } } }.start(); return webSocketClient; } catch (Exception e) { e.printStackTrace(); } return null; } }

 

 

 测试服务端代码

须要引入包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.1.3.RELEASE</version>
</dependency>

server:

import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import org.springframework.stereotype.Component; @ServerEndpoint(value = "/websocket") @Component public class MyWebSocket { //静态变量,用来记录当前在线链接数。应该把它设计成线程安全的。
    private static int onlineCount = 0; //concurrent包的线程安全Set,用来存放每一个客户端对应的MyWebSocket对象。
    private static CopyOnWriteArraySet<MyWebSocket> webSocketSet = new CopyOnWriteArraySet<MyWebSocket>(); //与某个客户端的链接会话,须要经过它来给客户端发送数据
    private Session session; /** * 链接创建成功调用的方法*/ @OnOpen public void onOpen(Session session) { this.session = session; webSocketSet.add(this);     //加入set中
        addOnlineCount();           //在线数加1
        System.out.println("有新链接加入!当前在线人数为" + getOnlineCount()); try { sendMessage("当前在线人数为" + getOnlineCount()); } catch (IOException e) { System.out.println("IO异常"); } } /** * 链接关闭调用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this);  //从set中删除
        subOnlineCount();           //在线数减1
        System.out.println("有一链接关闭!当前在线人数为" + getOnlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息*/ @OnMessage public void onMessage(String message, Session session) { System.out.println("来自客户端的消息:" + message); //群发消息
        for (MyWebSocket item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } } @OnError public void onError(Session session, Throwable error) { System.out.println("发生错误"); error.printStackTrace(); } public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 群发自定义消息 * */
    public static void sendInfo(String message) throws IOException { for (MyWebSocket item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { MyWebSocket.onlineCount++; } public static synchronized void subOnlineCount() { MyWebSocket.onlineCount--; } }
相关文章
相关标签/搜索