分布式项目(五)iot-pgsql

书接上回,在Mapping server中,咱们已经把数据都整理好了,如今利用postgresql存储历史数据。git

iot-pgsql

构建iot-pgsql模块,这里咱们写数据库为了性能考虑不在使用mybatis,换成spring jdbc批处理写数据库。spring

引入spring jdbc依赖sql

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
            <version>2.1.4.RELEASE</version>
        </dependency>

由于数据来此订阅的kafka数据,因此还须要引入kafka依赖,这里已经kakfa隔离成了一个独立的模块,因此加入kakfa模块就好了。数据库

<dependency>
            <groupId>cn.le</groupId>
            <artifactId>iot-kafka</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

kakfa监听

[@Component](https://my.oschina.net/u/3907912)
public class PgListener {

    @Autowired
    private PgService pgService;
    private static List<DeviceDataPO> INSERT_LIST = new ArrayList<>(1100);

    @KafkaListener(topics = DOWN_TOPIC)
    public void pgListener(String msg){
        KafkaDownVO downVO = JSONObject.parseObject(msg,KafkaDownVO.class);

        downVO.getDataList().forEach(propertyData -> {
            DeviceDataPO po = new DeviceDataPO();
            po.setDeviceId(downVO.getDeviceId());
            po.setPropertyId(propertyData.getPropertyId());
            po.setData(propertyData.getData());
            po.setCollTime(po.getCollTime());
            po.setCreateAt(new Date());
            INSERT_LIST.add(po);
        });

        //批量写入数据库
        if (INSERT_LIST.size() > 1000){
            jdbcBachInsert(INSERT_LIST);
        }
    }

    public void jdbcBachInsert(List<DeviceDataPO> downVOList){
        if (CollectionUtils.isEmpty(downVOList)){
            return;
        }
        pgService.jdbcBachInsert(downVOList);
        INSERT_LIST = new ArrayList<>(1100);
    }
}

jdbc批量写入

[@Service](https://my.oschina.net/service)
public class PgService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Transactional(rollbackFor = Exception.class)
    public void jdbcBachInsert(List<DeviceDataPO> list){
        jdbcTemplate.batchUpdate(PG_INSERT_SQL, new BatchPreparedStatementSetter() {
            @Override
            public void setValues(PreparedStatement ps, int i) throws SQLException {
                DeviceDataPO po = list.get(i);
                ps.setLong(1,po.getDeviceId());
                ps.setLong(2,po.getPropertyId());
                ps.setString(3,po.getData());
                ps.setTimestamp(4,new Timestamp(po.getCollTime().getTime()));
                ps.setTimestamp(5,new Timestamp(po.getCreateAt().getTime()));
            }

            @Override
            public int getBatchSize() {
                return list.size();
            }
        });
    }
}

OK,完成,启动项目,看一下数据库是否有写入的数据 mybatis

结束语

数据已经存储到数据库中了,下面就开始对设备数据的监控了。app

https://gitee.com/distant/iot-pt.gitide

相关文章
相关标签/搜索