文 | 吕鹏 DataPipeline架构师数据库
进入大数据时代,实时做业有着愈来愈重要的地位。本文将从如下几个部分进行讲解DataPipeline在大数据平台的实时数据流实践。缓存
1、企业级数据面临的主要问题和挑战网络
1.数据量不断攀升多线程
随着互联网+的蓬勃发展和用户规模的急剧扩张,企业数据量也在飞速增加,数据的量以GB为单位,逐渐的开始以TB/GB/PB/EB,甚至ZB/YB等。同时大数据也在不断深刻到金融、零售、制造等行业,发挥着愈来愈大的做用。架构
2. 数据质量的要求不断地提高框架
当前比较流行的AI、数据建模,对数据质量要求高。尤为在金融领域,对于数据质量的要求是很是高的。工具
3. 数据平台架构的复杂化oop
企业级应用架构的变化随着企业规模而变。规模小的企业,用户少、数据量也小,可能只需一个MySQL就搞能搞;中型企业,随着业务量的上升,这时候可能须要让主库作OLTP,备库作OLAP;当企业进入规模化,数据量很是大,原有的OLTP可能已经不能知足了,这时候咱们会作一些策略,来保证OLTP和OLAP隔离,业务系统和BI系统分开互不影响,但作了隔离后同时带来了一个新的困难,数据流的实时同步的需求,这时企业就须要一个可扩展、可靠的流式传输工具。post
2、大数据平台上的实践案例性能
下图是一个典型的BI平台设计场景,以MySQL为例,DataPipeline是如何实现MySQL的SourceConnector。MySQL做为Source端时:
使用binlog时须要注意开启row 模式而且image设置为 full。
1. MySQL SourceConnector 全量+增量实时同步的实现
下面是具体的实现流程图,首先开启repeatable read事务,保证在执行读锁以前的数据能够确实的读到。而后进行flush table with read lock 操做,添加一个读锁,防止这个时候有新的数据进入影响数据的读取,这时开始一个truncation with snapshot,咱们能够记录当前binlog的offset 并标记一个snapshot start,这时的offset 为增量读取时开始的offset。当事务开始后能够进行全量数据的读取。record marker这时会将生成record 写到 kafka 中,而后commit 这个事务。当全量数据push完毕后咱们解除读锁而且标记snapshot stop,此时全量数据已经都进入kafka了,以后从以前记录的offset开始增量数据的同步。
2. DataPipeline作了哪些优化工做
1)以往在数据同步环节都分为全量同步和增量同步,全量同步为一个批处理。在批处理时咱们都是进行all or nothing的处理,但当大数据状况下一个批量会占用至关长的时间,时间越长可靠性就越难保障,因此每每会出现断掉的状况,这时一个从新处理会让不少人崩溃。DataPipeline 解决了这一痛点,经过管理数据传输时的position 来作到断点续传,这时当一个大规模的数据任务即便发生了意外,也能够重断掉的点来继续以前的任务,大大缩短了同步的时间,提升了同步的效率。
2)在同步多个任务的时候,很难平衡数据传输对源端的压力和目的端的实时性,在大数据量下的传输尤为可以体现,这时DataPipeline 在此作了大量相关测试来优化不一样的链接池,开放数据传输效率的自定义化,供客户针对本身的业务系统定制合适的传输任务,对于不一样种类的数据库的传输进行优化和调整,保证数据传输的高效性。
3)自定义异构数据类型的转化,每每开源类大数据传输工具如 sqoop 等,对异构数据类型的支持不够灵活,种类也不够齐全。像金融领域中对数据精度要求较高的场景,在传统数据库向大数据平台传输时形成的精度丢失是很大的一个问题。DataPipeline 对此作了更多数据类型的支持,好比hive 支持的复杂类型以及 decimal 和 timestamp 等。
3. Sink端之Hive
1)Hive的特性
2)Hive同步的问题
3)KafkaConnect HDFS 的 Hive 同步实践
4)Recover的机制
recover 是一种恢复的机制,在数据传输的阶段每每可能出现各类不一样的问题,如网络问题等等。当出现问题后咱们须要恢复数据同步,那么recover是怎么保证数据正常传输不丢失呢?当recover开始的时候,获取目标文件在hdfs 上的租约,若是这时候须要读写的HDFS当前文件是被占用的,那咱们须要等待它直到能够获取到租约。当咱们获取到租约后就能够开始读以前写入时候的log,若是第一次会建立一个新的log,并标记一个begin,而后记录了当时的kafka offset。这时候须要清理以前遗留下来的临时数据,清理掉以后再从新开始同步直到同步结束会标记一个end。若是没有结束的话就至关于正在进行中,正在进行中每次都会提交当前同步的offset,来保证出现意外后会回滚到以前offset。
5)WAL (Write-Ahead Logging)机制
Write-Ahead Logging机制其实就是核心思想在数据写入到数据库以前,它先写临时文件,当一个批次结束后,在将这个临时文件更名为正式文件,确保每次提交后的正式文件一致性,若是中途出现写入错误将临时文件删除从新写入,至关于一个回滚。hive 的同步主要利用这种实现方式来保证一致性。首先它同步数据写入到HDFS临时文件上,确保一个批次的数据正常后再重命名到正式文件当中。正式的文件名会包含kafka offset,例如一个avro 文件的文件名为 xxxx+001+0020.avro ,这表示当前文件中有offset 1 到 20 的20条数据。
4. Sink端之GreenPlum
GreenPlum,是一个MPP架构的数据仓库,底层由多个postgres数据库做为计算节点,擅长OLAP,做为BI数据仓库有着良好的性能。
1)DataPipeline对GreenPlum 同步实践以及优化策略
➢ 每一个须要同步的表单独记录一个offset,当整个任务失败时能够分开进行恢复;
➢ 使用一个线程池管理加载数据的线程,每一个同步的表单独一个线程来进行加载数据,多表同时同步;
➢ 在加载数据的时间里,提早对kafka进行消费,缓存处理好的一个数据集,当一个线程加载数据结束后立刻开始新的线程加载数据,减小处理加载数据的时间;
同步GreenPlum须要注意:由于是经过copy 写入文件的,须要文件是结构化数据,典型的是使用CSV,CSV 写入时需注意spiltquote,escapequote,避免出现数据错位的现象。update主键的问题 , 当源端是update一个主键时,同时须要记录update前的主键,并在目标端进行删除。还有 0 特殊字符的问题,由于核心是用C语言,因此在同步的时候0须要特殊处理掉。
3、DataPipeline将来的工做
1.目前咱们碰到kafka connect rebalance的一些问题,因此咱们对其进行了改造。以往的rebalance机制是假如咱们增长或者删除一个task,会致使整个集群rebalance,这样形成不少无谓的开销并且频繁的rebalance 不利于数据同步的任务的稳定。因而咱们将rebalance机制改形成一个黏性的机制:
2.源端的数据一致性,目前经过WAL的机制能够保证目的端的一致性;
3.大数据量下的同步优化以及提升同步的稳定性。
4、总结
—end—