最近在用Apache beam作流上的异常检测,期间遇到了不少问题,可是发现网上相关的资料不多,基本只能本身啃文档和瞎尝试。
因此想把本身踩过的坑记录下来,但愿能对你们有所帮助。
其中若有错漏,欢迎指出。java
顾名思义,是从kafka上读取数据到beam上或者将beam上的数据写入到kafka中。官方文档中没有直接的教程,要在GitHub上的源码中找到相关使用说明。
Github上的Kafka源码git
这里仅说明读数据部分。
maven依赖示例github
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-kafka</artifactId> <version>...</version> </dependency>
读数据示例shell
PCollection<KV<String,String>> lines = //这里kV后说明kafka中的key和value均为String类型 p.apply(KafkaIO.<String, String>read() .withBootstrapServers("hadoop1:9092, hadoop2:9092")//必需,设置kafka的服务器地址和端口 .withTopic("mytopic")//必需,设置要读取的kafka的topic名称 .withKeyDeserializer(StringDeserializer.class)//必需 .withValueDeserializer(StringDeserializer.class)//必需 .withMaxNumRecords(301) .withTimestampFn(new MyTimestampFunction()) .updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest")) .withoutMetadata() )
如下分别后面非必需的一些设置apache
1.设置最大记录条数服务器
.withMaxNumRecords(301)
经过这个函数,能够设置最大读取的记录条数。app
2.设置PCollection中元素对应的时间戳maven
.withTimestampFn(new MyTimestampFunction())
当不进行这个设置的时候,beam会根据当前的系统时间为每一个元素分配一个时间戳。
而有的时候,咱们但愿用kafka的数据中自身带有的时间戳来做为PCollection中元素的时间戳,从而进行后续的窗口操做。这时就须要经过上面的函数来达到这一目的。
其中MyTimestampFunction()是咱们自定义的一个函数,其要实现SerializableFunction<KV<String, String>, Instant>
这个接口。
即从一条kafka数据中得到时间戳,而后以Instant(org.joda.time.Instant)的格式返回。函数
public class MyTimestampFunction implements SerializableFunction<KV<String, String>, Instant> { public Instant apply(KV<String, String> input){ String[] temps = input.getValue().split(","); DateTime t = new DateTime(Long.valueOf(temps[1])); return t.toInstant(); } }
3.设置读kafka数据的顺序oop
updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest"))
KafkaIO默认的数据读取顺序是从最新的数据开始。当咱们开发测试的时候,若是没有一个生产者同步向kafka生产数据,那么这里就拿不到数据。(在这坑了好久,才发现这个缘由...)
当咱们想实现相似于kafka shell中的--from-beginning
的功能的时候,即从最先的数据开始读,就须要进行这一设置。
这里不只能够改变读取数据的顺序,按照相似的方式,还能够进行其余设置。
4.丢弃掉kafka中的附加信息
.withoutMetadata()
使用这一设置时,获得的PCollection中的元素是kafka的key和value组成的键值对。
当不使用其时,获得的PCollection中的元素是KafkaRecord。会附件不少元数据。
5.其余设置
// custom function for watermark (default is record timestamp) * .withWatermarkFn(new MyWatermarkFunction()) * * // restrict reader to committed messages on Kafka (see method documentation). * .withReadCommitted() *
在源码的使用说明中还提到另外的两个设置,但由于暂时没用到,这里就暂且省略了。