上回说道CoAp client和server的实现,数据也按照既定格式发送到了kafka中,接下来就是Mapping server的实现,物理设备数据映射到抽象设备上,并赋予数据业务含义。git
构建iot-mapping模块,引入kafka公共模块web
SourceListener监听Coap server 发送的原始数据,并从redis中取出web manage缓存的产品物模型和设备数据,因为redis公用的比较多,因此这也构建一个iot-redis模块,用与redis操做。redis
@Autowired private BaseRedisUtil redisUtil; @KafkaListener(topics = SOURCE_TOPIC) public void iotListener(String msg){ System.out.println("-----------"+msg); KafkaSourceVO sourceVO = JSONObject.parseObject(msg, KafkaSourceVO.class); //设备信息 RedisDeviceVO deviceVO = redisUtil.get(sourceVO.getImei()); //产品信息 RedisProductVO productVO = redisUtil.get(deviceVO.getProductId()); if (EDataFormat.BYTE.getFormat().equals(productVO.getFormat())){ analysisByte(sourceVO,productVO,deviceVO); }else if (EDataFormat.JSON.getFormat().equals( productVO.getFormat())){ analysisJson(sourceVO,productVO,deviceVO); } }
public void analysisByte(KafkaSourceVO sourceVO, RedisProductVO productVO,RedisDeviceVO deviceVO){ char[] chars = sourceVO.getData().toCharArray(); List<RedisPropertyVO> propertys = productVO.getPropertys(); KafkaDownVO downVO = new KafkaDownVO(); downVO.setDeviceId(deviceVO.getId()); downVO.setCollTime(sourceVO.getCollTime()); List<KafkaDownVO.PropertyData> propertyDatas = new ArrayList<>(propertys.size()); propertys.forEach(property->{ String[] str = property.getOfset().split("-"); int begin = Integer.valueOf(str[0]); int end = Integer.valueOf(str[1]); KafkaDownVO.PropertyData data = new KafkaDownVO.PropertyData(); data.setPropertyId(property.getId()); StringBuilder sb = new StringBuilder(); for (int i = begin;i <= end; i++){ sb.append(chars[i]); } data.setData(sb.toString()); propertyDatas.add(data); }); downVO.setDataList(propertyDatas); System.out.println("byte---"+downVO); }
public void analysisJson(KafkaSourceVO sourceVO, RedisProductVO productVO,RedisDeviceVO deviceVO){ Map<String,Long> propertyMap = productVO.getPropertys(). stream().collect(Collectors.toMap(RedisPropertyVO :: getOfset,RedisPropertyVO::getId)); Map<String,String> dataMap = JSONObject.parseObject( sourceVO.getData(), HashMap.class); KafkaDownVO downVO = new KafkaDownVO(); downVO.setDeviceId(deviceVO.getId()); downVO.setCollTime(sourceVO.getCollTime()); List<KafkaDownVO.PropertyData> propertyDatas = new ArrayList<>(dataMap.size()); dataMap.forEach((key,val)->{ KafkaDownVO.PropertyData data = new KafkaDownVO.PropertyData(); data.setPropertyId(propertyMap.get(key)); data.setData(val); propertyDatas.add(data); }); downVO.setDataList(propertyDatas); System.out.println("json---"+downVO); }
启动项目,检验一下数据是否封装正确json
按iot-pt架构设计,如今须要把映射好的数据,再次写入kakfa中,供订阅费服务使用缓存
在iot-kafka模块中添加对Mapping 数据的写入架构
[@Component](https://my.oschina.net/u/3907912) public class KafkaMapping { @Autowired private KafkaTemplate kafkaTemplate; public void send(KafkaDownVO downVO){ String json = JSONObject.toJSONString(downVO); kafkaTemplate.send(DOWN_TOPIC,json); } }
修改analysisByte()和analysisJson()app
public void analysisByte(KafkaSourceVO sourceVO, RedisProductVO productVO,RedisDeviceVO deviceVO){ char[] chars = sourceVO.getData().toCharArray(); List<RedisPropertyVO> propertys = productVO.getPropertys(); KafkaDownVO downVO = new KafkaDownVO(); downVO.setDeviceId(deviceVO.getId()); downVO.setCollTime(sourceVO.getCollTime()); List<KafkaDownVO.PropertyData> propertyDatas = new ArrayList<>(propertys.size()); propertys.forEach(property->{ String[] str = property.getOfset().split("-"); int begin = Integer.valueOf(str[0]); int end = Integer.valueOf(str[1]); KafkaDownVO.PropertyData data = new KafkaDownVO.PropertyData(); data.setPropertyId(property.getId()); StringBuilder sb = new StringBuilder(); for (int i = begin;i <= end; i++){ sb.append(chars[i]); } data.setData(sb.toString()); propertyDatas.add(data); }); downVO.setDataList(propertyDatas); kafkaMapping.send(downVO); }
public void analysisJson(KafkaSourceVO sourceVO, RedisProductVO productVO,RedisDeviceVO deviceVO){ Map<String,Long> propertyMap = productVO.getPropertys(). stream().collect(Collectors.toMap(RedisPropertyVO :: getOfset,RedisPropertyVO::getId)); Map<String,String> dataMap = JSONObject.parseObject( sourceVO.getData(), HashMap.class); KafkaDownVO downVO = new KafkaDownVO(); downVO.setDeviceId(deviceVO.getId()); downVO.setCollTime(sourceVO.getCollTime()); List<KafkaDownVO.PropertyData> propertyDatas = new ArrayList<>(dataMap.size()); dataMap.forEach((key,val)->{ KafkaDownVO.PropertyData data = new KafkaDownVO.PropertyData(); data.setPropertyId(propertyMap.get(key)); data.setData(val); propertyDatas.add(data); }); downVO.setDataList(propertyDatas); kafkaMapping.send(downVO); }
再次启动项目,教研Mapping数据是否成写入kakfaide
接下来就是订阅服务的实现了,请听下回分解,具体的代码细节在gitui