数据复制主要指经过互联的网络在多台机器上保存相同数据的副本,经过数据复制方案,人们一般但愿达到如下目的:1)使数据在地理位置上更接近用户,进而下降访问延迟;2)当部分组件出现故障时,系统依旧能够继续工做,提升可用性;3)扩展至多台机器以同时提供数据访问服务,从而提高读吞吐量。
若是复制的数据一成不变,那么数据复制就很是容易,只须要将数据复制到每一个节点,一次性便可搞定,面对持续更改的数据如何正确而有效的完成数据复制是一个不小的挑战。html
表格存储(Tablestore)是阿里云自研的NoSQL多模型数据库,提供海量结构化数据存储以及快速的查询和分析服务,表格存储的分布式存储和强大的索引引擎可以提供PB级存储、千万TPS以及毫秒级延迟的服务能力。DataX是阿里巴巴集团内被普遍使用的离线数据同步工具,DataX自己做为数据同步框架,将不一样数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件。
经过使用DataX能够完成Tablestore表的数据复制,以下图所示,otsreader插件实现了从Tablestore读取数据,并能够经过用户指定抽取数据范围可方便的实现数据增量抽取的需求,otsstreamreader插件实现了Tablestore的增量数据导出,而otswriter插件则实现了向Tablestore中写入数据。经过在DataX中配置Tablestore相关的Reader和Writer插件,便可以完成Tablestore的表数据复制。git
通道服务(Tunnel Service)是基于表格存储数据接口之上的全增量一体化服务。通道服务为您提供了增量、全量、增量加全量三种类型的分布式数据实时消费通道。经过为数据表创建数据通道,能够简单地实现对表中历史存量和新增数据的消费处理。github
借助于全增量一体的通道服务,咱们能够轻松构建高效、弹性的数据复制解决方案。本文将逐步介绍如何结合通道服务进行Tablestore的数据复制,完整代码开源在github上的 tablestore-examples中。本次的实战将基于通道服务的Java SDK来完成,推荐先阅读下通道服务的相关文档,包括快速开始等。数据库
配置抽取其实对应的是数据同步所具有的功能,在本次实战中,咱们将完成指定时间点以前的表数据同步,指定的时间点能够是如今或者将来的某个时刻。具体的配置以下所示,ots-reader中记录的是源表的相关配置,ots-writer中记录的是目的表的相关配置。网络
{ "ots-reader": { "endpoint": "https://zhuoran-high.cn-hangzhou.ots.aliyuncs.com", "instanceName": "zhuoran-high", "tableName": "testSrcTable", "accessId": "", "accessKey": "", "tunnelName": "testTunnel", "endTime": "2019-06-19 17:00:00" }, "ots-writer": { "endpoint": "https://zhuoran-search.cn-hangzhou.ots.aliyuncs.com", "instanceName": "zhuoran-search", "tableName": "testDstTable", "accessId": "", "accessKey": "", "batchWriteCount": 100 } }
ots-reader中各参数的说明以下:负载均衡
yyyy-MM-dd HH:mm:ss
。ots-writer中各参数的说明以下(略去相同的参数):框架
注:将来会开放更多的功能配置,好比指定时间范围的数据复制等。
数据复制的主逻辑主要分为如下4步,在第一次运行时,会完整的进行全部步骤,而在程序重启或者断点续传场景时,只须要进行第3步和第4步。curl
sourceClient = new SyncClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(), config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName()); destClient = new SyncClient(config.getWriteConf().getEndpoint(), config.getWriteConf().getAccessId(), config.getWriteConf().getAccessKey(), config.getWriteConf().getInstanceName()); if (destClient.listTable().getTableNames().contains(config.getWriteConf().getTableName())) { System.out.println("Table is already exist: " + config.getWriteConf().getTableName()); } else { DescribeTableResponse describeTableResponse = sourceClient.describeTable( new DescribeTableRequest(config.getReadConf().getTableName())); describeTableResponse.getTableMeta().setTableName(config.getWriteConf().getTableName()); describeTableResponse.getTableOptions().setMaxTimeDeviation(Long.MAX_VALUE / 1000000); CreateTableRequest createTableRequest = new CreateTableRequest(describeTableResponse.getTableMeta(), describeTableResponse.getTableOptions(), new ReservedThroughput(describeTableResponse.getReservedThroughputDetails().getCapacityUnit())); destClient.createTable(createTableRequest); System.out.println("Create table success: " + config.getWriteConf().getTableName()); }
sourceTunnelClient = new TunnelClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(), config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName()); List<TunnelInfo> tunnelInfos = sourceTunnelClient.listTunnel( new ListTunnelRequest(config.getReadConf().getTableName())).getTunnelInfos(); String tunnelId = null; TunnelInfo tunnelInfo = getTunnelInfo(config.getReadConf().getTunnelName(), tunnelInfos); if (tunnelInfo != null) { tunnelId = tunnelInfo.getTunnelId(); System.out.println(String.format("Tunnel is already exist, TunnelName: %s, TunnelId: %s", config.getReadConf().getTunnelName(), tunnelId)); } else { CreateTunnelResponse createTunnelResponse = sourceTunnelClient.createTunnel( new CreateTunnelRequest(config.getReadConf().getTableName(), config.getReadConf().getTunnelName(), TunnelType.BaseAndStream)); System.out.println("Create tunnel success: " + createTunnelResponse.getTunnelId()); }
backgroundExecutor = Executors.newScheduledThreadPool(2, new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "background-checker-" + counter.getAndIncrement()); } }); backgroundExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { DescribeTunnelResponse resp = sourceTunnelClient.describeTunnel(new DescribeTunnelRequest( config.getReadConf().getTableName(), config.getReadConf().getTunnelName() )); // 已同步完成 if (resp.getTunnelConsumePoint().getTime() > config.getReadConf().getEndTime()) { System.out.println("Table copy finished, program exit!"); // 退出备份程序 shutdown(); } } }, 0, 2, TimeUnit.SECONDS);
if (tunnelId != null) { sourceWorkerConfig = new TunnelWorkerConfig( new OtsReaderProcessor(config.getReadConf(), config.getWriteConf(), destClient)); sourceWorkerConfig.setHeartbeatIntervalInSec(15); sourceWorker = new TunnelWorker(tunnelId, sourceTunnelClient, sourceWorkerConfig); sourceWorker.connectAndWorking(); }
使用通道服务,咱们须要编写数据的Process逻辑和Shutdown逻辑,数据同步中的核心在于解析数据并将其写入到目的表中,处理数据的完整代码以下所示,主要逻辑仍是比较清晰的,首先会检查数据的时间戳是否在合理的时间范围内,而后将StreamRecord转化为BatchWrite里对应的行,最后将数据串行写入到目的表中。分布式
public void process(ProcessRecordsInput input) { System.out.println(String.format("Begin process %d records.", input.getRecords().size())); BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest(); int count = 0; for (StreamRecord record : input.getRecords()) { if (record.getSequenceInfo().getTimestamp() / 1000 > readConf.getEndTime()) { System.out.println(String.format("skip record timestamp %d larger than endTime %d", record.getSequenceInfo().getTimestamp() / 1000, readConf.getEndTime())); continue; } count++; switch (record.getRecordType()) { case PUT: RowPutChange putChange = new RowPutChange(writeConf.getTableName(), record.getPrimaryKey()); putChange.addColumns(getColumns(record)); batchWriteRowRequest.addRowChange(putChange); break; case UPDATE: RowUpdateChange updateChange = new RowUpdateChange(writeConf.getTableName(), record.getPrimaryKey()); for (RecordColumn column : record.getColumns()) { switch (column.getColumnType()) { case PUT: updateChange.put(column.getColumn()); break; case DELETE_ONE_VERSION: updateChange.deleteColumn(column.getColumn().getName(), column.getColumn().getTimestamp()); break; case DELETE_ALL_VERSION: updateChange.deleteColumns(column.getColumn().getName()); break; default: break; } } batchWriteRowRequest.addRowChange(updateChange); break; case DELETE: RowDeleteChange deleteChange = new RowDeleteChange(writeConf.getTableName(), record.getPrimaryKey()); batchWriteRowRequest.addRowChange(deleteChange); break; default: break; } if (count == writeConf.getBatchWriteCount()) { System.out.println("BatchWriteRow: " + count); writeClient.batchWriteRow(batchWriteRowRequest); batchWriteRowRequest = new BatchWriteRowRequest(); count = 0; } } // 写最后一次的数据。 if (!batchWriteRowRequest.isEmpty()) { System.out.println("BatchWriteRow: " + count); writeClient.batchWriteRow(batchWriteRowRequest); } }
在本次的实战中,咱们结合通道服务完成一个简洁而有效的数据复制方案,实现了指定时间点的表数据复制。借助于本次的实战样例代码,用户仅须要配置源表和目的表的相关参数,便可以高效的完成的表数据的复制和数据的迁移。
在将来的演进中,通道服务还将支持建立指定时间段的通道,这样能够更加灵活的制定数据备份的计划,也能够完成持续备份和按时间点恢复等更加丰富的功能。ide
本文为云栖社区原创内容,未经容许不得转载。