在使用阿里云官方IoT JAVA Device SDK链接云端测试的时候,发现日志老是会打印一些莫名其妙Topic消息的订阅和发布,可是用户并无操做这些Topic,这是由于SDK底层默认作了不少系统Topic的订阅和发布设置,且没法关闭,致使不少测试不能知足预期的测试指望。若是不但愿一些系统Topic的默认订阅和发布,建议可使用开源MQTT Client进行Topic消息的订阅和发布。html
一、建立产品和设备java
参考:阿里云物联网平台Qucik Start 建立产品和设备部分。app
二、pom.xmleclipse
<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version> </dependency> </dependencies>
三、工具类 AliyunIoTSignUtiltcp
import javax.crypto.Mac; import javax.crypto.SecretKey; import javax.crypto.spec.SecretKeySpec; import java.util.Arrays; import java.util.Map; /** * AliyunIoTSignUtil */ public class AliyunIoTSignUtil { public static String sign(Map<String, String> params, String deviceSecret, String signMethod) { //将参数Key按字典顺序排序 String[] sortedKeys = params.keySet().toArray(new String[] {}); Arrays.sort(sortedKeys); //生成规范化请求字符串 StringBuilder canonicalizedQueryString = new StringBuilder(); for (String key : sortedKeys) { if ("sign".equalsIgnoreCase(key)) { continue; } canonicalizedQueryString.append(key).append(params.get(key)); } try { String key = deviceSecret; return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key); } catch (Exception e) { throw new RuntimeException(e); } } /** * HMACSHA1加密 * */ public static String encryptHMAC(String signMethod,String content, String key) throws Exception { SecretKey secretKey = new SecretKeySpec(key.getBytes("utf-8"), signMethod); Mac mac = Mac.getInstance(secretKey.getAlgorithm()); mac.init(secretKey); byte[] data = mac.doFinal(content.getBytes("utf-8")); return bytesToHexString(data); } public static final String bytesToHexString(byte[] bArray) { StringBuffer sb = new StringBuffer(bArray.length); String sTemp; for (int i = 0; i < bArray.length; i++) { sTemp = Integer.toHexString(0xFF & bArray[i]); if (sTemp.length() < 2) { sb.append(0); } sb.append(sTemp.toUpperCase()); } return sb.toString(); } }
四、main方法ide
import com.alibaba.taro.AliyunIoTSignUtil; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.HashMap; import java.util.Map; public class IoTDemoPubSubDemo { public static String productKey = "********"; public static String deviceName = "OpenMQTTDevice"; public static String deviceSecret = "********"; public static String regionId = "cn-shanghai"; // 物模型-属性上报topic private static String pubTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/post"; // 自定义topic,在产品Topic列表位置定义 private static String subTopic = "/"+productKey + "/" + deviceName+"/user/newdatademo"; private static MqttClient mqttClient; public static void main(String [] args){ initAliyunIoTClient(); // ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1, // new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build()); // // scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS); // 汇报属性 postDeviceProperties(); try { mqttClient.subscribe(subTopic); // 订阅Topic } catch (MqttException e) { System.out.println("error:" + e.getMessage()); e.printStackTrace(); } // 设置订阅监听 mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { System.out.println("connection Lost"); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { System.out.println("Sub message"); System.out.println("Topic : " + s); System.out.println(new String(mqttMessage.getPayload())); //打印输出消息payLoad } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }); } /** * 初始化 Client 对象 */ private static void initAliyunIoTClient() { try { // 构造链接须要的参数 String clientId = "java" + System.currentTimeMillis(); Map<String, String> params = new HashMap<>(16); params.put("productKey", productKey); params.put("deviceName", deviceName); params.put("clientId", clientId); String timestamp = String.valueOf(System.currentTimeMillis()); params.put("timestamp", timestamp); // cn-shanghai String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883"; String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|"; String mqttUsername = deviceName + "&" + productKey; String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1"); connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword); } catch (Exception e) { System.out.println("initAliyunIoTClient error " + e.getMessage()); } } public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception { MemoryPersistence persistence = new MemoryPersistence(); mqttClient = new MqttClient(url, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); // MQTT 3.1.1 connOpts.setMqttVersion(4); connOpts.setAutomaticReconnect(false); connOpts.setCleanSession(true); connOpts.setUserName(mqttUsername); connOpts.setPassword(mqttPassword.toCharArray()); connOpts.setKeepAliveInterval(60); mqttClient.connect(connOpts); } /** * 汇报属性 */ private static void postDeviceProperties() { try { //上报数据 //高级版 物模型-属性上报payload System.out.println("上报属性值"); String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}"; MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8")); message.setQos(1); mqttClient.publish(pubTopic, message); } catch (Exception e) { System.out.println(e.getMessage()); } } }
五、运行测试状况
工具
基于开源MQTT自主接入阿里云IoT平台(Java)
MQTT-TCP链接通讯post
原文连接测试
本文为云栖社区原创内容,未经容许不得转载。ui