【技术教程】MySQL to SequoiaDB数据迁移

1 背景

随着互联网的飞速发展,互联网的业务量呈爆发性增加,对于的数据量也迅速激增。传统的单机数据库在存储空间及性能的瓶颈,致使其将没法支撑企业业务的高速发展。伴随着海量数据对系统性能,成本以及扩展性的新需求,分布式数据库系统应运而生。sequoiadb做为是一款优秀的分布式文档型数据库,其底层基于分布式,高可用,高性能与动态数据类型设计的,可以应对海量数据的存储,及提供高效检索。java

传统数据库能够利用分布式数据库的优点来缓解其自身的瓶颈。好比,将历史数据迁移到sequoiadb,由sequoiadb提供存储及业务服务,以缓解传统数据库自身的压力。数据迁移分为全量迁移和增量迁移,本文主要对mysql到sequoiadb的增量数据迁移过程进行分析。mysql

2 场景介绍

本文将经过一个小案例来分析数据从mysql数据库抽取,并经由spark对数据进行清洗转换,最后装载到sequoiadb的迁移过程。sql

源数据mysql中有两张待迁移的数据表分别为student表和grade表,咱们须要对grade表进行增量迁移,每次只迁移更新的数据;此外,咱们须要一张由student和grade整合的大表,方便提供查询服务。数据库

具体方案为先经过select ...... into outfile ......方式从mysql数据库中导出数据为csv格式文件,指定导出编码为gb18030(此处模拟银行数据编码),经由java将文件编码由gb18030转为utf8编码类型;而后,针对不一样的数据,选择不一样的导入的方式。对于增量的数据能够调用sdbimprt和sdbupsert工具结合的方式来导入到SDB集群中,对于须要额外加工处理的数据能够利用spark的RDD特性进行处理并加装到sequoiadb集群中。json

3 数据迁移实现

数据迁移流程多线程

3.1 mysql数据抽取

mysql导出数据可使用mysqldump工具,也可用select into outfile的方式,此处使用select into outfile方式进行数据抽取,执行如下语句便可实现带条件抽取指定数据到指定目录下。其中参数character 指定为gb18030 编码,fields 分隔符为",",lines 分隔符为'\n'。并发

select * from student where id < 51 into outfile '/data/hbh/student.csv' character set gb18030 fields terminated by ',' lines terminated by '\n';

select * from grade into outfile '/tmp/grade.csv' character set gb18030 fields terminated by ',' lines terminated by '\n';

在执行select into outfile语句时,应确保具备MySQL的FILE权限,不然会遇到权限问题;而且输出文件必须不存在。这能够防止MySQL破坏重要的文件。app

 

3.2 数据迁移准备

咱们迁移的目标数据库为sequoiadb,此案例中咱们根据sequoiadb分布式的优点,结合它的数据分区特性,采用多维分区的方式将海量数据分散到多个数据分区组上进行存储。该方式经过结合了Hash分布方式和Partition分布方式的优势,让集合中的数据以更小的颗粒度分布到数据库多个数据分区组上,使得数据库的性能获得极大提高。dom

1) 对抽取出来的数据进行转码分布式

利用Linux自带的转码工具iconv进行转码,将编码由gb18030 转为utf8

iconv -t utf-8 -f gb2312 -c filename > newFilename

-f  原编码

-t  目标编码

-c 忽略没法转换的字符

 

2) 建立目标表(主子表模式)

//建立主表

sdb 'db.createCS("test")'

db.test.createCL("clname",{ShardingKey:{_id:1},ShardingType:"range",IsMainCL:true})

//建立子表

db.createCS("clname1",{Domain:"test_domain"}).createCL("clname1",{ShardingKey:{"_id:1"},ShardingType:"hash",Compressed:true,CompressionType:"lzw",AutoSplit:true})

//挂载

db.test.clname.attachCL("clname1.clname1",{LowBound:{_id:MinKey()},UpBound:{_id:MaxKey()}})'

//对主键创建惟一索引

db.test.clname.createIndex("id_Idx",{id:1},true,true)

 

3.3 增量数据迁移

3.3.1 sdbupsert更新迁移

增量的数据分为新增数据和更新数据,对于新增数据能够根据import工具直接导入,对于更新数据能够利用sdbupsert工具进行更新插入。sdbimprt工具导入数据时,对于更新后的数据而不是新插入的数据,在有惟一性约束的状况下就会出现键值重复(Duplicate Key)的异常,而且这些异常数据会以Json格式记录在.rec结尾的文件中(与导入的数据文件在同一目录下)。sdbupsert的功能就是将这些键值重复的数据upsert到SDB数据库中。而且支持多线程并发导入。

 

增量迁移

1) 使用sdbimprt导入工具进行导入迁移

sdbimprt --hosts="server1:11810,server2:11810,server3:11810" --delfield="\44" --type=csv --file=clname.csv -c test -l clname --fields='id string, user_id string,project string, score double, create_time string' -n 1000 -j 100

 

2) 使用sdbupsert进行更新迁移

对于键值重复的数据sdbimprt会记录在.rec结尾的文件,使用sdbupert进行更新导入。

java -jar sdbupsert.jar sdb.properties clname.rec

参数说明:

sdb.properties: 配置文件,含有调控参数,及链接参数配置

clname.rec:需导入的数据文件,json格式类型

 

3.3.2 表合并操做

在数据库的常见模型中(好比星型模型或者雪花模型),表通常分为两种:事实表和维度表。维度表通常指固定的、变更较少的表,例如联系人、物品种类等,通常数据有限。而事实表通常记录流水,好比销售清单等,一般随着时间的增加不断膨胀。这里的student表相似于维度表,grade表相似于事实表。对于这种大小表关联的join操做,sparkSQL有两种实现分为Broadcast Join和Shuffle Hash Join。在SparkSQL中,对两个表作Join最直接的方式是先根据key分区,再在每一个分区中把key值相同的记录拿出来作链接操做。但这样就不可避免地涉及到shuffle,而shuffle在Spark中是比较耗时的操做,Broadcast Join为了不shuffle,咱们能够将大小有限的维度表的所有数据分发到每一个节点上,供事实表使用。executor存储维度表的所有数据,必定程度上牺牲了空间,换取shuffle操做大量的耗时。可是这种方式只能用于广播较小的表,不然数据的冗余传输就远大于shuffle的开销。当维度表较大时,可使用Shuffle Hash Join方式,利用key相同必然分区相同的这个原理,SparkSQL将较大表的join分而治之,先将表划分红n个分区,再对两个表中相对应分区的数据分别进行Hash Join,这样即在必定程度上减小了driver广播一侧表的压力,也减小了executor端取整张被广播表的内存消耗。

Spark join同步数据

 

Spark做为一个分布式的计算引擎,能够经过分区的形式将大批量的数据划分红n份较小的数据集进行并行计算。咱们利用其这种特性能够进行大批量的数据的join关联操做,并落地到sequoiadb数据库中。

将须要作join的两张表数据分别加载出来作成dataframe,而后建立为临时视图进行join操做,以后将没份结果集插入到sequoiadb中。

val db= new Sequoiadb("192.168.5.188:11810","","")
val stu_grade=db.getCollectionSpace("test").getCollection("stu_grade")



import spark.implicits._

val spark = SparkSession
       .builder()
       .appName("SparkSQL_join")
       .config("spark.some.config.option", "some-value")
       .getOrCreate()

    
    val user_array = Array(StructField("id", StringType, nullable = true),StructField("name", StringType, nullable = true),StructField("age", IntegerType, nullable = true))
    val user_schema = (StructType(user_array))
    val user_df = spark.read.schema(user_schema).csv("user.csv")
    user_df.createTempView("user_tbl");
    val grade_array = Array(StructField("id", StringType, nullable = true),StructField("user_id", StringType, nullable = true),StructField("project", StringType, nullable = true),StructField("grade", IntegerType, nullable = true),StructField("create_time", IntegerType, nullable = true))
    val grade_schema = (StructType(grade_array))
    val grade_df = spark.read.schema(grade_schema).csv("grade.csv")
    grade_df.createTempView("grade_tbl")
    val user_grade = spark.sql("select g.id,u.name,g.project,g.grade,g.create_time from user_tbl u right join grade_tbl g on u.id=g.user_id")

    user_grade.foreachPartition(partitionOfRecords => {
      partitionOfRecords.foreach(pair => {
        val id = pair.get(0)
        val name = pair.get(1)
        val project = pair.get(2)
        val grade = pair.get(3)
        val create_time = pair.get(4);
         stu_grade.insert("{'id':'"+id+"','name':'"+name+"','project':'"+project+"','grade':'"+grade+"','create_time':"+create_time+ "}")
      })

 

4 结论

数据迁移是系统开发常常涉及到的一项工做。在企业级应用系统中,新系统的开发,新旧系统的升级换代,以及正常的系统维护,不可避免地涉及到大量的迁移工做。数据迁移看视简单却蕴含许多技术实现细节。本文粗劣的介绍了数据迁移的整个流程,及一些迁移的方式,还有许多迁移的细节,包括数据的检查,迁移先后数据一致性等还没有涉及。

 

相关文章
相关标签/搜索