Kafka自身提供的流式数据处理工具,轻量级java
Wordcountapache
package com.zhiwei.kafka.streams; import com.zhiwei.util.PropertyUtils; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Locale; import java.util.Properties; /** * wordcount 案例 */ public class WordCountApplication { public static void main(final String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "centos:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); /** * 指定数据源:topic * key: String * value: String */ KStream<String, String> source = builder.stream("kafkaStreamSourceTopic"); //分词,构建多个key-value记录 source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) //kafkaStream key分组,相似Hadoop MR的Reduce过程 .count(Materialized.as("counts-store")) //计算group: key对应value的数量,null会被会忽略 .toStream() //KafkaStream新的流构建 .to("kafkaStreamTargetTopic", Produced.with(Serdes.String(), Serdes.Long())); //输出目标数据源,key:String, value:Long final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); streams.start(); } }
消费者bootstrap
package com.zhiwei.kafka.streams; import com.zhiwei.util.PropertyUtils; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import java.io.IOException; import java.time.Duration; import java.util.Arrays; import java.util.Properties; /** * 订阅wordcount输出主题 * * 注意:Kafka消费者消费的key\value 编码、解码器须与KafkaStream输出格式保持一致不然出现乱码 */ @Slf4j public class KafkaStreamMsgConsumer { public static void main(String[] args) throws IOException { new KafkaStreamMsgConsumer().consume(); } public void consume() throws IOException { KafkaConsumer<String, Long> consumer = null; try { Properties props = new Properties(); props.put("bootstrap.servers", "centos:9092"); props.put("group.id", "myConsumerGroup"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"); consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("kafkaStreamTargetTopic")); while (true) { ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(1)); for (ConsumerRecord<String, Long> record : records) { log.info("key: {},value:{}", record.key(), record.value()); } } } finally { if (consumer != null) { consumer.close(); } } } }