- 移山是禧云自研的数据迁移平台,经过在移山的配置操做,能够方便的实现第三方数据接入、实时数据同步、异构数据源间迁移。
- 本文主要介绍移山数据迁移平台对异构数据源间迁移这块的实现思路和图形化配置实现流程。
能够理解为:指数据结构、存取方式、形式不同的多个数据源:前端
在创建数据仓库的过程当中,会有大量的 ETL(Extract、Transform、Load)工做,处理数据抽取、数据转换、数据装载的过程当中会有须要多种异构数据源(txt、Hbase、HDFS、Mysql、SqlServer等)迁移的场景。vue
数据中的一个字段被转移到目标数据字段中的一个过程。
好比:python
备注git
好比 数据报表系统 B 须要使用 业务系统 A 中的某一张表的数据用来作数据报表展示时,须要保证过多的数据查询、数据聚合处理不会影响该 系统 A 的正常业务。
解决数据湖(Hbase)中的数据迁移至数据仓库各层及关系型数据库之间的数据迁移问题。
因为要支持异构数据源的迁移,【移山】采用了在阿里内部被普遍使用的离线数据同步工具 DataX。github
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各类异构数据源之间稳定高效的数据同步功能。sql
DataX自己做为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,归入到整个同步框架中。shell
DataX目前已经有了比较全面的插件体系,主流的 RDBMS 数据库、NOSQL、大数据计算系统都已经接入:
备注数据库
针对前面提到的两种场景,咱们开发了数据迁移功能,如下为【移山】中数据迁移的主要实现流程。json
Vue.js + Element UI + vue-socket + codemirror
SpringBoot
python + Flask + gunicorn + supervisor + Celery
拿建立一个 HbaseReader 插件为例,配置样例以下:浏览器
{ "name": "hbase11xreader", "parameter": { "hbaseConfig": { "hbase.rootdir": "hdfs://test/test1", "hbase.cluster.distributed": "true", "hbase.zookeeper.quorum": "", }, "table": "table1", "encoding": "utf-8", "mode": "normal", "column": [{ "name": "info:column1", "type": "string" }, { "name": "info:column2", "type": "string" }, { "name": "info:column3", "type": "string" } ], "range": { "startRowkey": "", "endRowkey": "", "isBinaryRowkey": true } } }
备注
移山平台展现插件模板的时候使用了 codemirror 在线代码编辑器,能够友好的展现 JSON 格式的模板数据,效果以下:
总结
在准备好了数据迁移任务所需使用的 Reader/Writer 插件后,就能够经过移山去建立迁移任务了,其实就是经过图形化配置去生成一个完整的 Job 配置文件,为运行任务作准备。
该步骤主要配置任务并发数,脏数据限制信息。
备注
备注
从下拉列表里选择要使用的 Writer 插件,一样配置内容会自动匹配显示,一样能够对配置内容进行编辑,这里再也不截图。
经过前面步骤的配置,完整的 Job 配置文件已经成型, 在任务保存以前,会将该配置文件完整的展现出来,方便使用者对以前步骤里的设置数据进行检查,格式以下:
3.1.1 创建Websocket 服务
点击【执行】按钮,前端会发出执行任务的请求,浏览器与 DataX 服务会经过 Websocket 服务创建链接,方便前端实时拿到 DataX 服务产生的任务执行结果:
import io from 'vue-socket.io' Vue.use(io, domain.socketUrl)
3.1.2 将Job配置数据传给 DataX 服务:
socket.emit('action', jobConfig);
3.2.1 接收数据,生成 json 配置文件
从 data 中取出 Job 配置数据,并将该数据写入到一个 .json 文件里:
job_config = data['job_config'] # 生成临时执行文件 file_name = '/tmp/' + str(job_id) + '.json' with open(file_name, 'w') as f: f.write(job_content) f.close()
3.2.2 拼接执行命令
command = DATAX\_ROOT + ' data.py ' + file\_name
3.2.3 执行任务
利用 subprocess 的 Popen 用法来执行命令:
child_process = subprocess.Popen( command, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)
3.2.4 经过 stdout 获取执行信息
while child_process.poll() is None: line = child_process.stdout.readline() line = line.strip() + '\n'
3.2.5 将执行结果传送给 Websocket 客户端
经过socket.emit('action',line)
方法,将日志传送给 Websocket 客户端(浏览器):
if line: emit('execute_info', line) if child_process.returncode == 0: emit('execute_info', '执行成功!') else: emit('execute_info', '执行失败!')
前端利用 socket.on 方法接收任务执行日志:
this.$socket.on('execute_info', (info) => { // TODO 拿到执行信息,并展现 })
在监控管理界面,能够对当天全部的迁移任务进行监控:
欢迎你们关注个人微信公众号阅读更多文章: