移山(数据迁移平台)的数据迁移是怎么实现的

  1. 移山是禧云自研的数据迁移平台,经过在移山的配置操做,能够方便的实现第三方数据接入、实时数据同步、异构数据源间迁移。
  2. 本文主要介绍移山数据迁移平台对异构数据源间迁移这块的实现思路和图形化配置实现流程。

| 一. 什么是异构数据源

能够理解为:指数据结构、存取方式、形式不同的多个数据源:前端

  • 一个公司在信息化建设中,不一样时期、不一样背景、面对不一样的应用和客户会催生出多个系统;
  • 每一个系统可能会积累大量不一样存储方式的数据,从简单的 Excel 文件数据、txt 文本数据到复杂的关系型数据库 MYSQL、Oracle 数据等,它们构成了异构数据源。

| 二. 异构数据源迁移

离线数据仓库

  1. 为支撑集团运营发展和决策分析,禧云起建之初构建了完善的离线数据仓库体系;
  2. 从业务数据源抽取过来的数据存放在数据湖(Hbase)中;
  3. 数据仓库的数据存储为 HDFS,离线计算框架为 Hive,Spark。

数据迁移场景

在创建数据仓库的过程当中,会有大量的 ETL(Extract、Transform、Load)工做,处理数据抽取、数据转换、数据装载的过程当中会有须要多种异构数据源(txt、Hbase、HDFS、Mysql、SqlServer等)迁移的场景。vue

场景1: 简单的数据转换:字段水平的简单映射
数据中的一个字段被转移到目标数据字段中的一个过程。

好比:python

  • 业务数据库(Mysql)到数据湖(Hbase);
  • 从数据湖(Hbase)到数据仓库(HDFS);
  • 数据仓库(HDFS)到目标关系型数据库(Mysql)。

备注git

  • 复杂的数据转换
  • 须要作更多的数据分析,包括通用标识符问题、复杂条件过滤、时间类型的转换、去除重复记录等;
  • 此时能够借助魔盒(开发协做平台)来完成 Spark 计算任务的提交、工做流调度来实现;
  • 该类数据分析处理不在本文讨论范围,如有兴趣了解能够看这篇
场景2: 关系型数据库之间的数据迁移
好比 数据报表系统 B 须要使用 业务系统 A 中的某一张表的数据用来作数据报表展示时,须要保证过多的数据查询、数据聚合处理不会影响该 系统 A 的正常业务。

| 三. 移山数据迁移工具

1. 目的

解决数据湖(Hbase)中的数据迁移至数据仓库各层及关系型数据库之间的数据迁移问题。

2. 数据迁移工具

因为要支持异构数据源的迁移,【移山】采用了在阿里内部被普遍使用的离线数据同步工具 DataX。github

2.1 DataX简介

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各类异构数据源之间稳定高效的数据同步功能。sql

2.2 DataX3.0框架设计

DataX自己做为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,归入到整个同步框架中。
datax1.jpgshell

  • Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于链接Reader和Writer,做为二者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
2.3 DataX3.0插件体系

DataX目前已经有了比较全面的插件体系,主流的 RDBMS 数据库、NOSQL、大数据计算系统都已经接入:
数据迁移能力.png
备注数据库

2.4 移山数据迁移功能对DataX的封装
  • DataX 配置过程比较复杂,而且只支持命令行方式执行;
  • 为下降使用难度,咱们将配置过程进行了图形化处理,采用 Python 的 Flask 框架进行封装,任务执行支持 HTTP 请求调用

| 四. 移山数据迁移实现流程

针对前面提到的两种场景,咱们开发了数据迁移功能,如下为【移山】中数据迁移的主要实现流程。json

技术栈

前端
Vue.js + Element UI + vue-socket + codemirror
移山服务端
SpringBoot
DataX 服务端
python + Flask + gunicorn + supervisor + Celery

1. 建立Reader/Writer插件

1.1 准备插件配置模板

拿建立一个 HbaseReader 插件为例,配置样例以下:浏览器

{
    "name": "hbase11xreader",
    "parameter": {
        "hbaseConfig": {
            "hbase.rootdir": "hdfs://test/test1",
            "hbase.cluster.distributed": "true",
            "hbase.zookeeper.quorum": "",
        },
        "table": "table1",
        "encoding": "utf-8",
        "mode": "normal",
        "column": [{
                "name": "info:column1",
                "type": "string"
            },
            {
                "name": "info:column2",
                "type": "string"
            },
            {
                "name": "info:column3",
                "type": "string"
            }
        ],
        "range": {
            "startRowkey": "",
            "endRowkey": "",
            "isBinaryRowkey": true
        }
    }
}

备注

1.2 保存配置

移山平台展现插件模板的时候使用了 codemirror 在线代码编辑器,能够友好的展现 JSON 格式的模板数据,效果以下:
hbase-reader.png
总结

  • 经过借助 codemirror 插件,JSON 模板能够高亮的显示,同时强大的语法检查功能也有助于解决在编辑过程当中产生的语法错误,从而下降了 DataX 的Reader/Writer 插件配置难度。

2. 建立数据迁移任务

在准备好了数据迁移任务所需使用的 Reader/Writer 插件后,就能够经过移山去建立迁移任务了,其实就是经过图形化配置去生成一个完整的 Job 配置文件,为运行任务作准备。

2.1 配置任务基本属性

该步骤主要配置任务并发数,脏数据限制信息。
create-task0.png

备注

  • 最大脏记录数:写入数据时可容许的最大脏记录数,超过该阈值时,程序会执行失败;
  • 脏数据占比:写入数据时可容许的最大脏数据占比,超过该阈值时,程序会执行失败。
2.2 使用Reader插件

create-task1.png
备注

  • 能够对自动匹配显示的模板内容进行修改,以适应你的需求;
2.3 使用Writer插件

从下拉列表里选择要使用的 Writer 插件,一样配置内容会自动匹配显示,一样能够对配置内容进行编辑,这里再也不截图。

2.4 信息确认

经过前面步骤的配置,完整的 Job 配置文件已经成型, 在任务保存以前,会将该配置文件完整的展现出来,方便使用者对以前步骤里的设置数据进行检查,格式以下:
job配置文件.png

3. 执行迁移任务

3.1 前端

3.1.1 创建Websocket 服务 
点击【执行】按钮,前端会发出执行任务的请求,浏览器与 DataX 服务会经过 Websocket 服务创建链接,方便前端实时拿到 DataX 服务产生的任务执行结果:

import io from 'vue-socket.io'
Vue.use(io, domain.socketUrl)

3.1.2 将Job配置数据传给 DataX 服务:

socket.emit('action', jobConfig);
3.2 DataX 服务端

3.2.1 接收数据,生成 json 配置文件 
从 data 中取出 Job 配置数据,并将该数据写入到一个 .json 文件里:

job_config = data['job_config']
# 生成临时执行文件
file_name = '/tmp/' + str(job_id) + '.json'
with open(file_name, 'w') as f:
    f.write(job_content)
    f.close()

3.2.2 拼接执行命令

command = DATAX\_ROOT + ' data.py ' + file\_name

3.2.3 执行任务
利用 subprocess 的 Popen 用法来执行命令:

child_process = subprocess.Popen(
                    command,
                    stdin=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    shell=True)

3.2.4 经过 stdout 获取执行信息

while child_process.poll() is None:
    line = child_process.stdout.readline()
    line = line.strip() + '\n'

3.2.5 将执行结果传送给 Websocket 客户端
经过socket.emit('action',line)方法,将日志传送给 Websocket 客户端(浏览器):

if line:
   emit('execute_info', line)

if child_process.returncode == 0:
   emit('execute_info', '执行成功!')
else:
   emit('execute_info', '执行失败!')

4. 移山查看任务的执行状况

4.1 接收执行信息

前端利用 socket.on 方法接收任务执行日志:

this.$socket.on('execute_info', (info) => {
  // TODO 拿到执行信息,并展现       
})
4.2 展现执行结果

execute-info.png

4.3 监控迁移任务

在监控管理界面,能够对当天全部的迁移任务进行监控:
迁移任务监控.png

五. 总结

  • 经过使用移山的数据迁移功能,数据开发人员在移山中能够无感知的使用 DataX 这个 ETL 工具,取代经过手动开发、命令行执行的传统方式;
  • 经过图形化配置,就可以快速的建立、执行一个数据迁移任务;
  • 经过监控管理功能,能够方便的监控任务的执行状况;
  • 大大的提升了数据开发人员的开发效率。

关注微信公众号

欢迎你们关注个人微信公众号阅读更多文章:
微信公众号二维码.jpg

相关文章
相关标签/搜索