目前咱们的数据是经过OGG->Kafka->Spark Streaming->HBase。因为以前咱们发现HBase的列表put没法保证顺序,所以改了程序,若是是在同一个SparkStreaming的批次里面对同一条数据进行操做,则写入HBase的数据时间戳就很是相近,只会差几毫秒,若是是不一样批次则会差好几秒。此为背景。java
如今有一条数据,理应先删除再插入,可是结果变成了先插入再删除,结果以下apache
hbase(main):002:0> get 'XDGL_ACCT_PAYMENT_SCHEDULE','e5ad-***', {COLUMN=>'cf1:SQLTYPE',VERSIONS=>10}COLUMN CELL cf1:SQLTYPE timestamp=1498445308420, value=D cf1:SQLTYPE timestamp=1498445301336, value=I
其中,两条记录的时间戳换算过来正好相差了7秒
2017-06-26 10:48:21 I
2017-06-26 10:48:28 D oracle
很明显这两条数据并无在同一个批次获得处理,很明显Spark获取到数据的前后顺序出了点问题。dom
首先SparkStream接收到数据后根据数据的pos排序,而后再根据主键排序。从现象看,是SparkStreaming分了两个批次才拿到,而SparkStreaming从Kafka拿数据也是顺序拿的。那么出现问题的可能性就只有两个:
一、OGG发给Kafka的数据顺序是错误的。
二、OGG发给Kafka的数据顺序是正确的,可是发到了不一样的Kafka Partition。函数
为了验证上面的两个猜测,我把kafka的数据再次获取出来进行分析。重点分析数据的partition、key、value。
获得的结果以下:this
能够看到数据的同一个表数据写到了不一样的分区,能够看到OGG的同一分区下的数据顺序是正确的。
正好说明2.1里面的第二个猜测。看来是OGG写入的时候并无按照数据的表名写入不一样的分区。spa
在OGG 文档
http://docs.oracle.com/goldengate/bd1221/gg-bd/GADBD/GUID-2561CA12-9BAC-454B-A2E3-2D36C5C60EE5.htm#GADBD449
中的 5.1.4 Kafka Handler Configuration 的属性 gg.handler.kafkahandler.ProducerRecordClass 里面提到了,默认使用的是oracle.goldengate.handler.kafka.DefaultProducerRecord这个类对表名进行分区的。若是要自定义的话须要实现CreateProducerRecord这个接口3d
原话是 The unit of data in Kafka - a
ProducerRecord
holds the key field with the value representing the payload. This key is used for partitioning a Kafka Producer record that holds change capture data. By default, the fully qualified table name is used to partition the records. In order to change this key or behavior, theCreateProducerRecord
Kafka Handler Interface needs to be implemented and this property needs to be set to point to the fully qualified name of the customProducerRecord
class.code
然而写入kafka的结果却不是这样子的。这点让人费解。看来咱们须要查看OGG的源代码。htm
在OGG的安装包里面有一个名叫ggjava/resources/lib/ggkafka-****.jar
的文件,咱们将其导入一个工程以后就能够直接看到它的源代码了。
咱们直接查看oracle.goldengate.handler.kafka.DefaultProducerRecord
这个类
public class DefaultProducerRecord implements CreateProducerRecord { public DefaultProducerRecord() { } public ProducerRecord createProducerRecord(String topicName, Tx transaction, Op operation, byte[] data, TxOpMode handlerMode) { ProducerRecord pr; if(handlerMode.isOperationMode()) { pr = new ProducerRecord(topicName, operation.getTableName().getOriginalName().getBytes(), data); } else { pr = new ProducerRecord(topicName, (Object)null, data); } return pr; }}
这个类只返回一个ProducerRecord,这个是用于发送给Kafka的一条消息。咱们先无论这个,继续看他是如何写给kafka的
首先是OGG与Kafka相关的配置类 oracle.goldengate.handler.kafka.impl.KafkaProperties
。这个类里面定义了一堆参数,咱们只须要关心partitioner.class
这个参数,该参数用于定义写入Kafka的时候获取分区的类。很遗憾,这个类没有该参数配置。
这里有一个抽象类oracle.goldengate.handler.kafka.impl.AbstractKafkaProducer
,他有两个子类,分别叫BlockingKafkaProducer
和NonBlockingKafkaProducer
(默认是NonBlockingKafkaProducer)
这两个类都是直接将经过producer对象将record发送给了kafka。所以想要指导Kafka的分区信息还须要看Kafka是怎么获取分区的。
进入kafka的producer发送record的函数
public Future<RecordMetadata> send(ProducerRecord<K, V> record) { return send(record, null); }public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
发送的方法在doSend里面,里面内容不少,请看我勾出来的这两段
因为写入的时候都没有对Record指定分区,所以这段代码的partition都为空。因此代码总会执行到 this.partitioner.partition(record.topic(), record.key(), serializedKey,record.value(), serializedValue,cluster)
该函数是kafka的Partitioner这个抽象类里面的
因为2.3.2 Kafka配置类中没有指定分区的class,所以只会使用Kafka默认的分区类org.apache.kafka.clients.producer.internals.DefaultPartitioner
private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
这里先是获取了一个随机值,而后再获取了Kafka中对应topic的可用分区列表,而后根据分区数和随机值进行取余获得分区数的值。
流程走到这里,咱们基本能够获得一个结论。
事情到了这里,咱们能够判定,写入分区错乱的问题是由于gg.handler.kafkahandler.Mode
是事务模式,致使多条消息一次发送了,没法使用表名做为key,OGG就用了null做为key发送给了Kafka,最终Kafka拿到空值以后只能随机发送给某个partition,因此才会出现这样的问题。
最终,修改了ogg的操做模式以后能够看到,写入的分区正常了。