使用 DataX 增量同步数据(转)

关于 DataX

DataX 是阿里巴巴集团内被普遍使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各类异构数据源之间高效的数据同步功能。html

若是想进一步了解 DataX ,请进一步查看 DataX 详细介绍 。git

关于增量更新

DataX 支持多种数据库的读写, json 格式配置文件很容易编写, 同步性能很好, 一般能够达到每秒钟 1 万条记录或者更高, 能够说是至关优秀的产品, 可是缺少对增量更新的内置支持。github

其实增量更新很是简单, 只要从目标数据库读取一个最大值的记录, 多是 DateTime 或者 RowVersion 类型, 而后根据这个最大值对源数据库要同步的表进行过滤, 而后再进行同步便可。sql

因为 DataX 支持多种数据库的读写, 一种相对简单而且可靠的思路就是:docker

  1. 利用 DataX 的 DataReader 去目标数据库读取一个最大值;
  2. 将这个最大值用 TextFileWriter 写入到一个 CSV 文件;
  3. 用 Shell 脚原本读取 CSV 文件, 并动态修改所有同步的配置文件;
  4. 执行修改后的配置文件, 进行增量同步。

接下来就用 shell 脚原本一步一步实现增量更新。shell

增量更新的 shell 实现

个人同步环境是从 SQLServer 同步到 PostgreSQL , 部分配置以下:数据库

{ "job": { "content": [ { "reader": { "name": "sqlserverreader", "parameter": { "username": "...", "password": "...", "connection": [ { "jdbcUrl": [ "jdbc:sqlserver://[source_server];database=[source_db]" ], "querySql": [ "SELECT DataTime, PointID, DataValue FROM dbo.Minutedata WHERE 1=1" ] } ] } }, "writer": { "name": "postgresqlwriter", "parameter": { "username": "...", "password": "...", "connection": [ { "jdbcUrl": "jdbc:postgresql://[target_server]:5432/[target_db]", "table": [ "public.minute_data" ] } ], "column": [ "data_time", "point_id", "data_value" ], "preSql": [ "TRUNCATE TABLE @table" ] } } } ], "setting": { } } }

 

更多的配置能够参考 SqlServerReader 插件文档以及 PostgresqlWriter 插件文档json

要实现增量更新, 首先要 PostgresqlReader 从目标数据库读取最大日期, 并用 TextFileWriter 写入到一个 csv 文件, 这一步个人配置以下所示:bash

{ "job": { "content": [ { "reader": { "name": "postgresqlreader", "parameter": { "connection": [ { "jdbcUrl": [ "jdbc:postgresql://[target_server]:5432/[target_db]" ], "querySql": [ "SELECT max(data_time) FROM public.minute_data" ] } ], "password": "...", "username": "..." } }, "writer": { "name": "txtfilewriter", "parameter": { "dateFormat": "yyyy-MM-dd HH:mm:ss", "fileName": "minute_data_max_time_result", "fileFormat": "csv", "path": "/scripts/", "writeMode": "truncate" } } } ], "setting": { } } }

 

更多的配置能够看考 PostgresqlDataReader 插件文档以及 TextFileWriter 插件文档工具

有了这两个配置文件, 如今能够编写增量同步的 shell 文件, 内容以下:

#!/bin/bash ### every exit != 0 fails the script set -e # 获取目标数据库最大数据时间,并写入一个 csv 文件 docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \ beginor/datax:3.0 \ /scripts/minute_data_max_time.json if [ $? -ne 0 ]; then
  echo "minute_data_sync.sh error, can not get max_time from target db!" exit 1
fi # 找到 DataX 写入的文本文件,并将内容读取到一个变量中 RESULT_FILE=`ls minute_data_max_time_result_*` MAX_TIME=`cat $RESULT_FILE` # 若是最大时间不为 null 的话, 修改所有同步的配置,进行增量更新; if [ "$MAX_TIME" != "null" ]; then # 设置增量更新过滤条件 WHERE="DataTime > '$MAX_TIME'"
  sed "s/1=1/$WHERE/g" minute_data.json > minute_data_tmp.json # 将第 45 行的 truncate 语句删除; sed '45d' minute_data_tmp.json > minute_data_inc.json # 增量更新 docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \ beginor/datax:3.0 \ /scripts/minute_data_inc.json # 删除临时文件 rm ./minute_data_tmp.json ./minute_data_inc.json else # 所有更新 docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \ beginor/datax:3.0 \ /scripts/minute_data.json fi

 

在上面的 shell 文件中, 使用我制做的 DataX docker 镜像, 使用命令 docker pull beginor/datax:3.0 便可获取该镜像, 当也能够修改这个 shell 脚本直接使用 datax 命令来执行。

为何用 shell 来实现

由于 DataX 支持多种数据库的读写, 充分利用 DataX 读取各类数据库的能力, 减小了不少开发工做, 毕竟 DataX 的可靠性是很好的。

 

文章来源:https://beginor.github.io/2018/06/29/incremental-sync-with-datax.html

相关文章
相关标签/搜索