经过HTTP接口实现调用MQTT Client发送数据,HTTP接口返回值为MQTT Client发送数据的对应结果。 HTTP接口为同步阻塞,MQTT Client 为异步回调方式。
如何实如今HTTP接口中调用MQTT Client发送数据后,可以阻塞等待MQTT返回结果,而后将结果返回?异步
CountDownLatch + Callbale+FutureTaskide
1.CountDownLatch做用this
CountDownLatch实如今MQTT Client 发送数据后 到接收数据后这段时间的阻塞。 HTTP每次请求,新建一个CountDownLatch,而后将CountDownLatch做为值和deviceId做为KEY保存到Map中, 调用MQTT Client 发送数据后,countDownLatch.await(),进行同步等待 在MQTT Client接收数据的回调方法中更加deviceId取出CountDwonLatch而后计数减一
2.Callbale+FutureTask做用线程
将调用MQTT Client发送数据的过程,封装成Callable,投递发送任务时,经过返回的FutureTask的get()方法, 同步阻塞,直到结果返回。
1.Map保存CountDownObj用于同步阻塞等待MQTT Client返回结果,以及将返回结果传递个FutureTaskcode
private final static ConcurrentMap<String, CountDownObj> countDownLatchMap = new ConcurrentHashMap<>(); //线程池 private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), runnable -> { Thread thread = new Thread(runnable, "mqtt thread"); return thread; });
2.HTTP API 调用的发送MQTT 消息数据的接口对象
/** * HTTP API 调用的发送MQTT 消息数据的接口 * 同步阻塞 */ public Integer send(Long packageId, String deviceId) throws Exception { ...... FutureTask<Integer> futureTask = sendTask(publishDto)); return futureTask.get() }
3.投递发送MQTT指令的task方法接口
/** * 投递MQTT发送指令任务 * 同步阻塞 */ private FutureTask<Integer> sendTask(PublishDto publishDto) throws Exception { FutureTask<Integer> futureTask = new FutureTask<>(new GetDatapointValueCallable(publishDto)); threadPoolExecutor.execute(futureTask); //阻塞线程 return futureTask; }
4.封装CountDownLatch 和 Integer的对象,用于CountDownLatch阻塞控制和返回结果rem
/** * 封装CountDownLatch 和 Integer * 用于CountDownLatch阻塞控制和返回结果 */ private class CountDownObj { private final CountDownLatch countDownLatch; private volatile Integer value; private CountDownObj(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public CountDownLatch getCountDownLatch() { return countDownLatch; } public Integer getValue() { return value; } public void setValue(Integer value) { this.value = value; } }
5.具体发送MQTT数据的Callbale线程Task,会新建CountDownLatch,并经过CountDownLatch.await()方法阻塞,直到MQTT回调接收到数据或者超时。get
/** * 发送MQTT消息的任务Callable */ private class GetDatapointValueCallable implements Callable<Integer> { private final PublishDto publishDto; GetDatapointValueCallable(PublishDto publishDto) { this.publishDto = publishDto; } @Override public Integer call() throws Exception { //mqtt client 发送数据,此处具体代码省略 ...... CountDownLatch countDownLatch = new CountDownLatch(1); countDownLatchMap.putIfAbsent(publishDto.getDeviceId(), new CountDownObj(countDownLatch)); //阻塞,超时时间3s countDownLatch.await(3, TimeUnit.SECONDS); //返回mqtt指令对应的结果或者null return countDownLatchMap.remove(publishDto.getDeviceId()).getValue(); } }
6.MQTT接收数据回调,这里经过deviceId从MAP里面取到CountDownObj,释放闭锁(结束callable线程的等待)和设置MQTT返回的结果(即callable中call()返回的结果,也就是FutureTask的get()方法返回的结果)。同步
/** * MQTT 接收数据回调 */ void mqttReceiveCallback(String deviceId, String datapointId, String value) { ...... //接收到数据后,经过闭锁释放阻塞的线程,同时设置结果返回给调用者 CountDownObj countDownObj=countDownLatchMap.get(deviceId); if(countDownObj!=null) { countDownObj.setValue(Integer.parseInt(value)); countDownObj.getCountDownLatch().countDown(); } ....... }