是时候改变你数仓的增量同步方案了

通过一段时间的演化,spark-binlog,delta-plus慢慢进入正轨。spark-binlog能够将MySQL binlog做为标准的Spark数据源来使用,目前支持insert/update/delete 三种事件的捕捉。 delta-plus则是对Delta Lake的一个加强库,譬如在Delta Plus里实现了将binlog replay进Detla表,从而保证Delta表和数据库表接近实时同步。除此以外,detla-plus还集成了譬如布隆过滤器等来尽快数据更新更新速度。更多特性可参考我写的专栏。mysql

数据湖Delta Lake 深刻解析 ​ zhuanlan.zhihu.com 图标 有了这两个库,加上Spark,咱们就能经过两行代码完成库表的同步。sql

之前若是要作数据增量同步,大概须要这么个流程: 数据库

问题很明显,Pipeline长,涉及到技术多,中间转存其实也挺麻烦的,难作到实时。咱们但愿能够更简单些,好比最好是这样: apache

而后我可能只要写以下代码就能够搞定:app

val spark: SparkSession = ???编辑器

val df = spark.readStream. format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource"). option("host","127.0.0.1"). option("port","3306"). option("userName","xxxxx"). option("password","xxxxx"). option("databaseNamePattern","mlsql_console"). option("tableNamePattern","script_file"). optioin("binlogIndex","4"). optioin("binlogFileOffset","4"). load()url

df.writeStream. format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").
option("path","/tmp/sync/tables"). option("mode","Append"). option("idCols","id"). option("duration","5"). option("syncType","binlog"). checkpointLocation("/tmp/cpl-binlog2") .mode(OutputMode.Append).save("{db}/{table}") 读和写,很是简单。读你须要提供MySQL binlog信息,写的时候指定主键,以及表的存储路径。spa

若是使用MLSQL则更简单,下面是一个完整的流式同步脚本:插件

set streamName="binlog";code

load binlog.`` where host="127.0.0.1" and port="3306" and userName="xxxx" and password="xxxxxx" and bingLogNamePrefix="mysql-bin" and binlogIndex="4" and binlogFileOffset="4" and databaseNamePattern="mlsql_console" and tableNamePattern="script_file" as table1;

save append table1
as rate.mysql_{db}.{table} options mode="Append" and idCols="id" and duration="5" and syncType="binlog" and checkpointLocation="/tmp/cpl-binlog2";

由于是增量同步,因此第一次须要先全量同步一次,用MLSQL也很简单:

connect jdbc where url="jdbc:mysql://127.0.0.1:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false" and driver="com.mysql.jdbc.Driver" and user="xxxxx" and password="xxxx" as db_cool;

load jdbc.db_cool.script_file as script_file; save overwrite script_file as delta.mysql_mlsql_console.script_file ;

load delta.mysql_mlsql_console.script_file as output; 若是你使用了Console则可在编辑器里直接运行:

若是你安装了binlog2delta插件, 则可享受向导便利:

相关文章
相关标签/搜索