Apache Beam采坑系列——KafkaIO

最近在用Apache beam作流上的异常检测,期间遇到了不少问题,可是发现网上相关的资料不多,基本只能本身啃文档和瞎尝试。
因此想把本身踩过的坑记录下来,但愿能对你们有所帮助。
其中若有错漏,欢迎指出。java

KafkaIO

顾名思义,是从kafka上读取数据到beam上或者将beam上的数据写入到kafka中。官方文档中没有直接的教程,要在GitHub上的源码中找到相关使用说明。
Github上的Kafka源码git

这里仅说明读数据部分。
maven依赖示例github

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-kafka</artifactId>
    <version>...</version>
</dependency>

读数据示例shell

PCollection<KV<String,String>> lines = //这里kV后说明kafka中的key和value均为String类型
                p.apply(KafkaIO.<String, String>read()
                .withBootstrapServers("hadoop1:9092, hadoop2:9092")//必需,设置kafka的服务器地址和端口
                .withTopic("mytopic")//必需,设置要读取的kafka的topic名称
                .withKeyDeserializer(StringDeserializer.class)//必需
                .withValueDeserializer(StringDeserializer.class)//必需
                .withMaxNumRecords(301)
                .withTimestampFn(new MyTimestampFunction())
                .updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest"))
                .withoutMetadata()
        )

如下分别后面非必需的一些设置apache

1.设置最大记录条数服务器

.withMaxNumRecords(301)

经过这个函数,能够设置最大读取的记录条数。app

2.设置PCollection中元素对应的时间戳maven

.withTimestampFn(new MyTimestampFunction())

当不进行这个设置的时候,beam会根据当前的系统时间为每一个元素分配一个时间戳。
而有的时候,咱们但愿用kafka的数据中自身带有的时间戳来做为PCollection中元素的时间戳,从而进行后续的窗口操做。这时就须要经过上面的函数来达到这一目的。
其中MyTimestampFunction()是咱们自定义的一个函数,其要实现SerializableFunction<KV<String, String>, Instant>这个接口。
即从一条kafka数据中得到时间戳,而后以Instant(org.joda.time.Instant)的格式返回。函数

public class MyTimestampFunction implements SerializableFunction<KV<String, String>, Instant> {

    public Instant apply(KV<String, String> input){
        String[] temps = input.getValue().split(",");
        DateTime t = new DateTime(Long.valueOf(temps[1]));
        return t.toInstant();
    }
}

3.设置读kafka数据的顺序oop

updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest"))

KafkaIO默认的数据读取顺序是从最新的数据开始。当咱们开发测试的时候,若是没有一个生产者同步向kafka生产数据,那么这里就拿不到数据。(在这坑了好久,才发现这个缘由...)
当咱们想实现相似于kafka shell中的--from-beginning的功能的时候,即从最先的数据开始读,就须要进行这一设置。
这里不只能够改变读取数据的顺序,按照相似的方式,还能够进行其余设置。

4.丢弃掉kafka中的附加信息

.withoutMetadata()

使用这一设置时,获得的PCollection中的元素是kafka的key和value组成的键值对。
当不使用其时,获得的PCollection中的元素是KafkaRecord。会附件不少元数据。

5.其余设置

// custom function for watermark (default is record timestamp)
 *       .withWatermarkFn(new MyWatermarkFunction())
 *
 *       // restrict reader to committed messages on Kafka (see method documentation).
 *       .withReadCommitted()
 *

在源码的使用说明中还提到另外的两个设置,但由于暂时没用到,这里就暂且省略了。

相关文章
相关标签/搜索