基于Tablestore Tunnel的数据复制实战

前言
数据复制主要指经过互联的网络在多台机器上保存相同数据的副本,经过数据复制方案,人们一般但愿达到如下目的:1)使数据在地理位置上更接近用户,进而下降访问延迟;2)当部分组件出现故障时,系统依旧能够继续工做,提升可用性;3)扩展至多台机器以同时提供数据访问服务,从而提高读吞吐量。
若是复制的数据一成不变,那么数据复制就很是容易,只须要将数据复制到每一个节点,一次性便可搞定,面对持续更改的数据如何正确而有效的完成数据复制是一个不小的挑战。git

使用DataX进行Tablestore数据复制
表格存储(Tablestore)是阿里云自研的NoSQL多模型数据库,提供海量结构化数据存储以及快速的查询和分析服务,表格存储的分布式存储和强大的索引引擎可以提供PB级存储、千万TPS以及毫秒级延迟的服务能力。DataX是阿里巴巴集团内被普遍使用的离线数据同步工具,DataX自己做为数据同步框架,将不一样数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件。
经过使用DataX能够完成Tablestore表的数据复制,以下图所示,otsreader插件实现了从Tablestore读取数据,并能够经过用户指定抽取数据范围可方便的实现数据增量抽取的需求,otsstreamreader插件实现了Tablestore的增量数据导出,而otswriter插件则实现了向Tablestore中写入数据。经过在DataX中配置Tablestore相关的Reader和Writer插件,便可以完成Tablestore的表数据复制。
图片描述github

使用通道服务进行Tablestore数据复制
通道服务(Tunnel Service)是基于表格存储数据接口之上的全增量一体化服务。通道服务为您提供了增量、全量、增量加全量三种类型的分布式数据实时消费通道。经过为数据表创建数据通道,能够简单地实现对表中历史存量和新增数据的消费处理。
图片描述
借助于全增量一体的通道服务,咱们能够轻松构建高效、弹性的数据复制解决方案。本文将逐步介绍如何结合通道服务进行Tablestore的数据复制,完整代码开源在github上的 tablestore-examples中。本次的实战将基于通道服务的Java SDK来完成,推荐先阅读下通道服务的相关文档,包括快速开始等。数据库

  1. 配置抽取

配置抽取其实对应的是数据同步所具有的功能,在本次实战中,咱们将完成指定时间点以前的表数据同步,指定的时间点能够是如今或者将来的某个时刻。具体的配置以下所示,ots-reader中记录的是源表的相关配置,ots-writer中记录的是目的表的相关配置。网络

{
"ots-reader": {负载均衡

"endpoint": "https://zhuoran-high.cn-hangzhou.ots.aliyuncs.com",
"instanceName": "zhuoran-high",
"tableName": "testSrcTable",
"accessId": "",
"accessKey": "",
"tunnelName": "testTunnel",
"endTime": "2019-06-19 17:00:00"

},
"ots-writer": {框架

"endpoint": "https://zhuoran-search.cn-hangzhou.ots.aliyuncs.com",
"instanceName": "zhuoran-search",
"tableName": "testDstTable",
"accessId": "",
"accessKey": "",
"batchWriteCount": 100

}
}
ots-reader中各参数的说明以下:curl

endpoint: Tablestore服务的Endpoint地址,例如https://zhuoran-high.cn-hangz...。在进行数据复制前,请检查下连通性(可使用curl命令)。
instanceName: Tablestore的实例名。
tableName: Tablestore的表名。
accessId: 访问Tablestore的云帐号accessId。
accessKey: 访问Tablestore的云帐号accessKey。
tunnelName: Tablestore的通道名,配置
endTime: 数据同步的截止时间点,对应到Java里SimpleFormat的格式为:yyyy-MM-dd HH:mm:ss 。
ots-writer中各参数的说明以下(略去相同的参数):分布式

batchWriteCount: Tablestore单次批量写入的条数,最大值为200。
注:将来会开放更多的功能配置,好比指定时间范围的数据复制等。ide

  1. 编写主逻辑

数据复制的主逻辑主要分为如下4步,在第一次运行时,会完整的进行全部步骤,而在程序重启或者断点续传场景时,只须要进行第3步和第4步。工具

建立复制目的表
经过使用DesribeTable接口,咱们能够获取到源表的Schema,借此能够建立出目的表,值得注意的是须要把目的表的有效版本误差设成一个足够大的值(默认为86400秒),由于服务端在处理写请求时会对属性列的版本号进行检查,写入的版本号须要在一个范围内才能写入成功,对于源表中的历史存量数据而言,时间戳每每是比较小的,会被服务端过滤掉,最终致使同步数据的丢失。
sourceClient = new SyncClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),

config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());

destClient = new SyncClient(config.getWriteConf().getEndpoint(), config.getWriteConf().getAccessId(),

config.getWriteConf().getAccessKey(), config.getWriteConf().getInstanceName());

if (destClient.listTable().getTableNames().contains(config.getWriteConf().getTableName())) {

System.out.println("Table is already exist: " + config.getWriteConf().getTableName());

} else {

DescribeTableResponse describeTableResponse = sourceClient.describeTable(
    new DescribeTableRequest(config.getReadConf().getTableName()));
describeTableResponse.getTableMeta().setTableName(config.getWriteConf().getTableName());
describeTableResponse.getTableOptions().setMaxTimeDeviation(Long.MAX_VALUE / 1000000);
CreateTableRequest createTableRequest = new CreateTableRequest(describeTableResponse.getTableMeta(),
    describeTableResponse.getTableOptions(),
    new ReservedThroughput(describeTableResponse.getReservedThroughputDetails().getCapacityUnit()));
destClient.createTable(createTableRequest);
System.out.println("Create table success: " + config.getWriteConf().getTableName());

}
在源表上建立通道
使用通道服务的CreateTunnel接口能够建立通道,此处咱们建立全量加增量类型(TunnelType.BaseAndStream)类型的通道。
sourceTunnelClient = new TunnelClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),

config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());

List<TunnelInfo> tunnelInfos = sourceTunnelClient.listTunnel(

new ListTunnelRequest(config.getReadConf().getTableName())).getTunnelInfos();

String tunnelId = null;
TunnelInfo tunnelInfo = getTunnelInfo(config.getReadConf().getTunnelName(), tunnelInfos);
if (tunnelInfo != null) {

tunnelId = tunnelInfo.getTunnelId();
System.out.println(String.format("Tunnel is already exist, TunnelName: %s, TunnelId: %s",
    config.getReadConf().getTunnelName(), tunnelId));

} else {

CreateTunnelResponse createTunnelResponse = sourceTunnelClient.createTunnel(
    new CreateTunnelRequest(config.getReadConf().getTableName(),
        config.getReadConf().getTunnelName(), TunnelType.BaseAndStream));
System.out.println("Create tunnel success: " + createTunnelResponse.getTunnelId());

}
启动定时任务来监测备份进度
备份进度的监测能够经过DesribeTunnel接口来完成,DescribeTunnel接口能够获取到最新消费到的时间点,经过和配置里的备份结束时间对比,咱们能够获取到当前同步的进度。在到达结束时间后,便可退出备份程序。
backgroundExecutor = Executors.newScheduledThreadPool(2, new ThreadFactory() {

private final AtomicInteger counter = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
    return new Thread(r, "background-checker-" + counter.getAndIncrement());
}

});
backgroundExecutor.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
    DescribeTunnelResponse resp = sourceTunnelClient.describeTunnel(new DescribeTunnelRequest(
        config.getReadConf().getTableName(), config.getReadConf().getTunnelName()
    ));
    // 已同步完成
    if (resp.getTunnelConsumePoint().getTime() > config.getReadConf().getEndTime()) {
        System.out.println("Table copy finished, program exit!");
        // 退出备份程序
        shutdown();
    }
}

}, 0, 2, TimeUnit.SECONDS);
启动数据复制
启动通道服务的自动化消费框架,开始自动化的数据同步,其中OtsReaderProcessor中完成的是源表数据的解析和目的表的写入,处理逻辑将会在后文中介绍。
if (tunnelId != null) {

sourceWorkerConfig = new TunnelWorkerConfig(
    new OtsReaderProcessor(config.getReadConf(), config.getWriteConf(), destClient));
sourceWorkerConfig.setHeartbeatIntervalInSec(15);
sourceWorker = new TunnelWorker(tunnelId, sourceTunnelClient, sourceWorkerConfig);
sourceWorker.connectAndWorking();

}

  1. 数据同步逻辑(OtsReaderProcessor)

使用通道服务,咱们须要编写数据的Process逻辑和Shutdown逻辑,数据同步中的核心在于解析数据并将其写入到目的表中,处理数据的完整代码以下所示,主要逻辑仍是比较清晰的,首先会检查数据的时间戳是否在合理的时间范围内,而后将StreamRecord转化为BatchWrite里对应的行,最后将数据串行写入到目的表中。

public void process(ProcessRecordsInput input) {

System.out.println(String.format("Begin process %d records.", input.getRecords().size()));
BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
int count = 0;
for (StreamRecord record : input.getRecords()) {
    if (record.getSequenceInfo().getTimestamp() / 1000 > readConf.getEndTime()) {
        System.out.println(String.format("skip record timestamp %d larger than endTime %d",
            record.getSequenceInfo().getTimestamp() / 1000, readConf.getEndTime()));
        continue;
    }
    count++;
    switch (record.getRecordType()) {
        case PUT:
            RowPutChange putChange = new RowPutChange(writeConf.getTableName(), record.getPrimaryKey());
            putChange.addColumns(getColumns(record));
            batchWriteRowRequest.addRowChange(putChange);
            break;
        case UPDATE:
            RowUpdateChange updateChange = new RowUpdateChange(writeConf.getTableName(),
                record.getPrimaryKey());
            for (RecordColumn column : record.getColumns()) {
                switch (column.getColumnType()) {
                    case PUT:
                        updateChange.put(column.getColumn());
                        break;
                    case DELETE_ONE_VERSION:
                        updateChange.deleteColumn(column.getColumn().getName(),
                            column.getColumn().getTimestamp());
                        break;
                    case DELETE_ALL_VERSION:
                        updateChange.deleteColumns(column.getColumn().getName());
                        break;
                    default:
                        break;
                }
            }
            batchWriteRowRequest.addRowChange(updateChange);
            break;
        case DELETE:
            RowDeleteChange deleteChange = new RowDeleteChange(writeConf.getTableName(),
                record.getPrimaryKey());
            batchWriteRowRequest.addRowChange(deleteChange);
            break;
        default:
            break;
    }

    if (count == writeConf.getBatchWriteCount()) {
        System.out.println("BatchWriteRow: " + count);
        writeClient.batchWriteRow(batchWriteRowRequest);
        batchWriteRowRequest = new BatchWriteRowRequest();
        count = 0;
    }
}

// 写最后一次的数据。
if (!batchWriteRowRequest.isEmpty()) {
    System.out.println("BatchWriteRow: " + count);
    writeClient.batchWriteRow(batchWriteRowRequest);
}

}

  1. 技术注解

如何保障备份性能?
备份过程分为全量(存量)和增量阶段,对于全量阶段,通道服务会自动将全表的数据在逻辑上划分红接近指定大小的若干分片,全量阶段的数据同步的总体并行度和分片数相关,可以有效的保障吞吐量。而对于增量阶段,为了保障数据的有序性,单分区内的数据咱们须要串行处理数据,增量阶段的性能和分区数成正比关系(增量同步性能白皮书),若是须要提速(增长分区)能够联系表格存储技术支持。
如何作到数据同步的水平扩展?
运行多个TunnelWorker(客户端)对同一个Tunnel进行消费时(TunnelId相同), 在TunnelWorker执行Heartbeat时,通道服务端会自动的对Channel(分区)资源进行重分配,让活跃的Channel尽量的均摊到每个TunnelWorker上,达到资源负载均衡的目的。同时,在水平扩展性方面,用户能够很容易的经过增长TunnelWorker的数量来完成,TunnelWorker能够在同一个机器或者不一样机器上。更多的原理能够参见数据消费框架原理介绍。
如何作到数据的最终一致性?
数据的一致性创建在通道服务的保序协议基础上,经过全量和增量数据同步的幂等性能够保障备份数据的最终一致。
如何完成断点续传功能?
通道服务的客户端会按期将已同步(消费)完成的数据的时间位点按期发送到服务端进行持久化,在发生Failover或者重启程序后,下一次的数据消费会从记录的checkpoint开始数据处理,不会形成数据的丢失。

将来展望
在本次的实战中,咱们结合通道服务完成一个简洁而有效的数据复制方案,实现了指定时间点的表数据复制。借助于本次的实战样例代码,用户仅须要配置源表和目的表的相关参数,便可以高效的完成的表数据的复制和数据的迁移。
在将来的演进中,通道服务还将支持建立指定时间段的通道,这样能够更加灵活的制定数据备份的计划,也能够完成持续备份和按时间点恢复等更加丰富的功能。

参考文献Desiging Data-Intensive Applications.

相关文章
相关标签/搜索