最近有一个将 mysql 数据导入到 MongoDB 中的需求,打算使用 Kettle 工具实现。本文章记录了数据导入从0到1的过程,最终实现了每秒钟快速导入约 1200 条数据。一块儿来看吧~
简单说下该转换流程,增量导入数据:mysql
<!--more-->linux
1)根据 source 和 db 字段来获取 MongoDB 集合内 business_time 最大值。sql
2)设置 mysql 语句shell
3)对查询的字段进行更名数据库
4)过滤数据:只往 MongoDB 里面导入 person_id,address,business_time 字段均不为空的数据。json
根据 source 和 db 字段来获取 bussiness_time 的最大值,Kettle 的 MongoDB 查询语句以下图所示:segmentfault
对应的 MongDB 的写法为:网络
记得勾选 Query is aggregation pipeline 选项:工具
取消选中 Output single JSON field ,表示下一组件接收到的结果是一个 Number 类型的单值,不然就是一个 json 对象。大数据
设置 mysql 数据库 jdbc 链接后,填好 SQL 语句以后,在下方的“从步骤插入数据”下拉列表中,选中“MongoDB input”。“MongoDB input” 中的变量,在 SQL 语句中用 ? 表示,以下图所示:
若是导数的时候发生中文乱码,能够点击 编辑 ,选择 数据库链接 的 选项,添加配置项:characterEncoding utf8,便可解决。以下图所示:
若是查询出来的列名须要更改,则可使用“字段选择”组件,该组件还能够移除某字段,本次应用中,主要使用该组件将字段名进行修改。以下图所示:
只保留 person_id,address,business_time 字段都不为空的数据:
很简单,在“增长常量”组件内设置好要增长常量的类型和值便可。
添加“Excel 输出”,设置好文件名,若是有必要的话还能够设置 Excel 字段格式,以下图所示:
以下图所示,因为一开始就介绍了 MongoDB 的链接方式,因此在这里不在赘述。
根据 id、source、db 字段插入更新数据,以下图所示:
更多 MongoDB output 可参考:https://wiki.pentaho.com/disp...
为 mysql 查询字段添加索引。(略)
对 MongoDB 查询作优化,建立复合索引:
对于 MongoDB input 组件来讲,会关联查询出 business_time 最大值,因此要建立复合索引,建立复合索引时要注意字段顺序,按照查询顺序建立:
db.trajectory_data.createIndex({source: 1, db: 1, business_time: 1})
对于 MongoDB output 组件来讲,由于已经设置了 插入或更新 数据的规则,也会涉及到查询,因此再设置一个复合索引:
db.trajectory_data.createIndex({id: 1, source: 1, db: 1})
运行前,须要在集合内插入一条含 business_time 字段的 demo 数据,不然 MongoDB input 会由于查不到数据而报错:
db.trajectory_data.insert({ id: 0, source: 'xx数据', db: "17-db2", business_time: 0 })
成功插入数据后,执行该转换:
可经过点击 “执行结果” --> “步骤度量” 来查看各组件运行状态,以下图所示:
24 分钟共导了 172 万的数据,每秒钟约导入 1200 条数据。
这样子,这个转换基本就算完成了。能够在 linux 上写一个定时任务去执行这个转换,每次转换 mysql 都会将大于 mongoDB 集合中 business_time 字段最大值的数据增量导入到 MongoDB 中。
像上述的 Kettle 流程也是有不足的。假如一次性拉取的数据量过大,颇有可能致使 Mysql 或 Kettle 内存溢出而报错。因此上述流程只适合小数据量导入。大数据量导入的话仍是建议分批次导入或者分页导入,你们能够研究一下。