首先关于 camel 的基本概念和用法,以及 kafka 的基本概念和用法,这里就不啰嗦了。这篇文章假设你对两者都有基本的认识。html
camel 自己是一个路由引擎,经过 camel 你能够定义路由规则,指定从哪里(源)接收消息,如何处理这些消息,以及发往哪里(目标)。camel-kafka 就是 camel 的其中一个组件,它从指定的 kafka topic 获取消息来源进行处理。java
有些小伙伴可能有疑问了,kafka 自己不就是生产者-消费者模式吗?原生 kafka 发布消息,而后消费进行消息处理不就好了,为啥还用 camel-kafka 呢?git
首先恭喜你是一个爱思考的小伙伴!这个问题的答案是这样,camel 自己提供的是高层次的抽象,你能够选择从 kafka 做为源接收数据,也可使用其它组件,好比mq,文件等。camel 让你能使用相同的api和处理流程,处理不一样协议和数据类型的系统。github
全部总结下,(下面这句话很重要,读三遍)apache
camel实现了客户端与服务端的解耦, 生产者和消费者的解耦。api
好比咱们能够选择从kafka获取消息,而后发送到jms(activemq)。bash
from("kafka:test?brokers=localhost:9092") .to("jms:queue:test.mq.queue")
camel对每一个组件约定一个发送和接受的 endpoint uri,kafka 的uri格式是,app
kafka:topic[?options]
option中有不少选项,好比最重要的 brokers 指定 kafka 服务的地址。而后是uri的参数,相似http uri的参数格式。下面是个示例:ui
from("kafka:test?brokers=localhost:9200" + "&maxPollRecords=5000" + "&consumersCount=1" + "&seekTo=beginning" + "&groupId=kafkaGroup") .routeId("FromKafka") .to("stream:out");
每一个参数的意思我就不一一解释了,在文章最后我会给出官方的连接,你们能够本身去查阅。spa
说了这么多,咱们仍是运行一个程序看看效果。这个程序来自 apache camel 官方example,完整的代码在文章的最后有连接。
首先,pom引入依赖,
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-kafka</artifactId> <version>2.24.1</version> </dependency>
我须要在本地启动一个 kafka 的server,具体过程网上不少,这里不啰嗦了。惟一要注意的是 kafka server 的版本最好跟 camel-kafka 引入的 kafka-client 版本一致,以避免踩坑。
kafka环境安装好以后,建立两个topic,
bogon:kafka_2.11-2.2.0 ponyma$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic TestLog Created topic TestLog. bogon:kafka_2.11-2.2.0 ponyma$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic AccessLog Created topic AccessLog.
先来看看消费者部分的代码,
public static void main(String[] args) throws Exception { System.out.println("About to run Kafka-camel integration2..."); CamelContext camelContext = new DefaultCamelContext(); // Add route to send messages to Kafka camelContext.addRoutes(new RouteBuilder() { public void configure() { PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class); pc.setLocation("classpath:application.properties"); System.out.println("About to start route: Kafka Server -> Log "); from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}" + "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount={{consumer.consumersCount}}" + "&seekTo={{consumer.seekTo}}" + "&groupId={{consumer.group}}") .routeId("FromKafka") .to("stream:out"); } }); camelContext.start(); // let it run for 5 minutes before shutting down Thread.sleep(5 * 60 * 1000); camelContext.stop(); }
这个代码的核心就是camel的路由配置,也很简单,当前这个路由的意思是,从 kafka 某个 topic 读取数据,不作任何处理直接发送到标准输出。
再来看看生产者,
camelContext.addRoutes(new RouteBuilder() { public void configure() { PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class); pc.setLocation("classpath:application.properties"); // setup kafka component with the brokers KafkaComponent kafka = new KafkaComponent(); kafka.setBrokers("{{kafka.host}}:{{kafka.port}}"); camelContext.addComponent("kafka", kafka); from("direct:kafkaStart").routeId("DirectToKafka") .to("kafka:{{producer.topic}}").log(LoggingLevel.INFO, "${headers}"); // Topic can be set in header as well. from("direct:kafkaStartNoTopic").routeId("kafkaStartNoTopic") .to("kafka:dummy") .log(LoggingLevel.INFO, "${headers}"); // Use custom partitioner based on the key. /** * partitioner指定分区发送 */ from("direct:kafkaStartWithPartitioner").routeId("kafkaStartWithPartitioner") .to("kafka:{{producer.topic}}?partitioner={{producer.partitioner}}") .log("${headers}"); // Takes input from the command line. from("stream:in").setHeader(KafkaConstants.PARTITION_KEY, simple("0")) .setHeader(KafkaConstants.KEY, simple("1")).to("direct:kafkaStart"); } });
上面四个 from to 对应 下面四个发送的示例,经过日志打印咱们能够看看数据是否被正确的进行路由了。
headers.put(KafkaConstants.PARTITION_KEY, 0); headers.put(KafkaConstants.KEY, "1"); producerTemplate.sendBodyAndHeaders("direct:kafkaStart", testKafkaMessage, headers);
这段代码的意思是,生产者发送数据到 direct:kafkaStart 这个endpoint上, headers指定了全部的消息都会发送到 kafka topic 的第一个分区。
headers.put(KafkaConstants.KEY, "2"); headers.put(KafkaConstants.TOPIC, "TestLog"); producerTemplate.sendBodyAndHeaders("direct:kafkaStartNoTopic", testKafkaMessage, headers);
生产者发送数据到 direct:kafkaStartNoTopic 这个endpoint上,对应上面第二个 from to ,虽然没有指定发送目标的 kafka topic,可是咱们在 header 里指定了 topic,因此跟第一个 from to 其实能够达到一样的效果。
后面两个就不贴出代码了,一个是发送到分区0,一个发送到分区1。分区的原则是 header 里指定的key,分区器是自定义的,在源码 stringPartitioner.java 中。这里不表。
先启动消费者端,而后启动生产者端,结果以下:
能够看到,运行的结果跟咱们分析的是一致的。
本文所用的示例源码地址:
https://camel.apache.org/comp...
参考:
https://github.com/apache/cam...
欢迎你们关注个人公众号