记一次OGG数据写入HBase的丢失数据缘由分析


1、现象

目前咱们的数据是经过OGG->Kafka->Spark Streaming->HBase。因为以前咱们发现HBase的列表put没法保证顺序,所以改了程序,若是是在同一个SparkStreaming的批次里面对同一条数据进行操做,则写入HBase的数据时间戳就很是相近,只会差几毫秒,若是是不一样批次则会差好几秒。此为背景。java

如今有一条数据,理应先删除再插入,可是结果变成了先插入再删除,结果以下apache

 
 
 
 
 
hbase(main):002:0> get 'XDGL_ACCT_PAYMENT_SCHEDULE','e5ad-***', {COLUMN=>'cf1:SQLTYPE',VERSIONS=>10}COLUMN CELL cf1:SQLTYPE timestamp=1498445308420, value=D cf1:SQLTYPE timestamp=1498445301336, value=I

其中,两条记录的时间戳换算过来正好相差了7秒
2017-06-26 10:48:21 I
2017-06-26 10:48:28 D oracle

很明显这两条数据并无在同一个批次获得处理,很明显Spark获取到数据的前后顺序出了点问题。dom

2、缘由排查

2.1 SparkStreaming程序排查

首先SparkStream接收到数据后根据数据的pos排序,而后再根据主键排序。从现象看,是SparkStreaming分了两个批次才拿到,而SparkStreaming从Kafka拿数据也是顺序拿的。那么出现问题的可能性就只有两个:
一、OGG发给Kafka的数据顺序是错误的。
二、OGG发给Kafka的数据顺序是正确的,可是发到了不一样的Kafka Partition。函数

2.2 Kafka数据验证

为了验证上面的两个猜测,我把kafka的数据再次获取出来进行分析。重点分析数据的partition、key、value。
获得的结果以下:this

能够看到数据的同一个表数据写到了不一样的分区,能够看到OGG的同一分区下的数据顺序是正确的。
正好说明2.1里面的第二个猜测。看来是OGG写入的时候并无按照数据的表名写入不一样的分区。spa

在OGG 文档
http://docs.oracle.com/goldengate/bd1221/gg-bd/GADBD/GUID-2561CA12-9BAC-454B-A2E3-2D36C5C60EE5.htm#GADBD449
中的 5.1.4 Kafka Handler Configuration 的属性 gg.handler.kafkahandler.ProducerRecordClass 里面提到了,默认使用的是oracle.goldengate.handler.kafka.DefaultProducerRecord这个类对表名进行分区的。若是要自定义的话须要实现CreateProducerRecord这个接口3d

原话是 The unit of data in Kafka - a ProducerRecord holds the key field with the value representing the payload. This key is used for partitioning a Kafka Producer record that holds change capture data. By default, the fully qualified table name is used to partition the records. In order to change this key or behavior, theCreateProducerRecord Kafka Handler Interface needs to be implemented and this property needs to be set to point to the fully qualified name of the custom ProducerRecord class.code

然而写入kafka的结果却不是这样子的。这点让人费解。看来咱们须要查看OGG的源代码。htm

2.3 查看OGG源码

在OGG的安装包里面有一个名叫ggjava/resources/lib/ggkafka-****.jar的文件,咱们将其导入一个工程以后就能够直接看到它的源代码了。

2.3.1 生成Kafka消息类

咱们直接查看oracle.goldengate.handler.kafka.DefaultProducerRecord这个类

 
 
 
 
 
public class DefaultProducerRecord implements CreateProducerRecord { public DefaultProducerRecord() { } public ProducerRecord createProducerRecord(String topicName, Tx transaction, Op operation, byte[] data, TxOpMode handlerMode) { ProducerRecord pr; if(handlerMode.isOperationMode()) { pr = new ProducerRecord(topicName, operation.getTableName().getOriginalName().getBytes(), data); } else { pr = new ProducerRecord(topicName, (Object)null, data); } return pr; }}

这个类只返回一个ProducerRecord,这个是用于发送给Kafka的一条消息。咱们先无论这个,继续看他是如何写给kafka的

2.3.2 Kafka配置类

首先是OGG与Kafka相关的配置类 oracle.goldengate.handler.kafka.impl.KafkaProperties 。这个类里面定义了一堆参数,咱们只须要关心partitioner.class这个参数,该参数用于定义写入Kafka的时候获取分区的类。很遗憾,这个类没有该参数配置。

2.3.3 Kafka 消息发送类

这里有一个抽象类oracle.goldengate.handler.kafka.impl.AbstractKafkaProducer,他有两个子类,分别叫BlockingKafkaProducerNonBlockingKafkaProducer (默认是NonBlockingKafkaProducer)
这两个类都是直接将经过producer对象将record发送给了kafka。所以想要指导Kafka的分区信息还须要看Kafka是怎么获取分区的。

2.3.4 Kafka 分区获取方式

进入kafka的producer发送record的函数

 
 
 
 
 
public Future<RecordMetadata> send(ProducerRecord<K, V> record) { return send(record, null); }public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }

发送的方法在doSend里面,里面内容不少,请看我勾出来的这两段

因为写入的时候都没有对Record指定分区,所以这段代码的partition都为空。因此代码总会执行到 this.partitioner.partition(record.topic(), record.key(), serializedKey,record.value(), serializedValue,cluster)
该函数是kafka的Partitioner这个抽象类里面的

因为2.3.2 Kafka配置类中没有指定分区的class,所以只会使用Kafka默认的分区类org.apache.kafka.clients.producer.internals.DefaultPartitioner

private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
这里先是获取了一个随机值,而后再获取了Kafka中对应topic的可用分区列表,而后根据分区数和随机值进行取余获得分区数的值。

流程走到这里,咱们基本能够获得一个结论。

  • Kafka的record指定了分区,则会使用指定的分区写入;不然进行下一个判断;
  • Kafka根据本身定义的partitioner接口进行分区,若是没指定类,则使用默认的分区则进行下一个判断;
  • Kafka获取record中的key进行分区,若是key不为空,则使用Hash分区,若是为空,基本上就是随机分配分区了。

3、结论

事情到了这里,咱们能够判定,写入分区错乱的问题是由于gg.handler.kafkahandler.Mode是事务模式,致使多条消息一次发送了,没法使用表名做为key,OGG就用了null做为key发送给了Kafka,最终Kafka拿到空值以后只能随机发送给某个partition,因此才会出现这样的问题。

最终,修改了ogg的操做模式以后能够看到,写入的分区正常了。

相关文章
相关标签/搜索