利用streamsets实现mysql到oracle的实时同步

Mysql创建streamsets库,新建一张stream_test表,主键为a
在这里插入图片描述

Oracle创建一个STREAMSETS库,新建一张STREAM_TEST表,主键为A
在这里插入图片描述

前提:mysql数据库开启binlog
示例:windows下开启binlog

1、查看binlog是否开启:show variables like ‘log_bin’
在这里插入图片描述

2、若为OFF则需开启binlog
在打开my.ini文件,在mysqld下面添加
log_bin=mysql-bin
binlog-format=ROW
server-id=1
在这里插入图片描述

3、保存文件,重启mysql服务
在这里插入图片描述

4、查看binlog状态:show variables like ‘log_bin’
在这里插入图片描述

上述步骤完成后进入系统后创建第一个Pipelines
在这里插入图片描述

查看是否存在JDBC组件,mysql日志记录组件,进入包管理器
在这里插入图片描述

如果组件已存在可直接进行管道流配置,如果不存在需安装所需组件.安装方式如下
在这里插入图片描述

由于网络状况可能出现安装失败的情况,如果安装失败可能会导致无法继续安装,此时需要删除已安装内容.
进入项目目录下,
进入 streamsets-libs 文件夹 # cd streamsets-libs
然后删除刚刚安装失败的组件包# rm -rf streamsets-datacollector-********
然后重新安装即可
安装完成后重启即可,再次进入包管理器即可看到组件已安装.即可进行pipelines管道流配置,在右侧选择你要使用的组件,单击,即可出现在左侧画布上
在这里插入图片描述

先依次将下图的中的组件拖到画布上 MySQL Binary Log,Stream Selector,JavaScript Evaluator,JDBC Producer,trash
在这里插入图片描述

完成之后开始进行配置
首先单击画布空白区域配置报错方式
在这里插入图片描述

设置日志存储路径、文件名、等待时间及文件大小
在这里插入图片描述

然后点击MySQL Binary Log组件,导入mysql驱动包,mysql5.7为mysql-connector-java-5.1.47.jar,点击upload并重启
在这里插入图片描述

重启完后继续点击MySQL Binary Log并按照下图设置,server ID为开启binlog时my.ini里填的ID,
在这里插入图片描述

输入账号密码
在这里插入图片描述

输入需要抓取日志的表
在这里插入图片描述

该组件配置完成,将箭头连接到Stream Selector并点击该组件,点击+号进行DML日志分流,依次输入${record:value(’/Type’)‘INSERT’}
${record:value(’/Type’)
‘DELETE’}
${record:value(’/Type’)==‘UPDATE’}
完成后将四个箭头连接相应的组件
在这里插入图片描述

之后点击JavaScript Evaluator 组件进行字段内容的抓取,按照下图设置
JS代码:(insert和update)
for(var i = 0; i < records.length; i++) {
try {
var newRecord = sdcFunctions.createRecord(true);
newRecord.value = records[i].value[‘Data’];
newRecord.value.Type = records[i].value[‘Type’];
newRecord.value.Database = records[i].value[‘Database’];
newRecord.value.Table = records[i].value[‘Table’];
log.info(records[i].value[‘Type’])
output.write(newRecord);
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}
JS代码:(delete)
for(var i = 0; i < records.length; i++) {
try {
var newRecord = sdcFunctions.createRecord(true);
newRecord.value = records[i].value[‘OldData’];
newRecord.value.Type = records[i].value[‘Type’];
newRecord.value.Database = records[i].value[‘Database’];
newRecord.value.Table = records[i].value[‘Table’];
log.info(records[i].value[‘Type’])
output.write(newRecord);
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}
在这里插入图片描述

最后点击JDBC Producer组件,首先导入目标库的驱动,本例为oracle,导入方式和MySQL Binary Log组件一样,oracle11g为ojdbc6.jar
导入之后按照下图配置,第二张图默认操作类型,根据上一组件依次配置相应的DML。
在这里插入图片描述
在这里插入图片描述

所有组件配置好之后,点击检测
在这里插入图片描述

检测成功之后,点击start启动Pipeline,下方有各种监控报表,此时可以尝试在mysql中进行dml操作,并查看监控结果及oracle相应表是否同步了数据
在这里插入图片描述