最近一直忙着各类设计和文档,终于有时间来更新一点儿关于kafka的东西。以前有一篇文章讲述的是kafka Producer端的程序,也就是日志的生产者,这部分比较容易理解,业务系统将运行日志或者业务日志发送到broker中,由broker代为存储。那讲的是如何收集日志,今天要写的是如何获取日志,而后再作相关的处理。 前端
以前写过kafka是讲日志按照topic的形式存储,一个topic会按照partition存在同一个文件夹下,目录在config/server.properties中指定,具体的存储规则能够查看以前的文章: java
# The directory under which to store log files log.dir=/tmp/kafka-logs
Consumer端的目的就是为了获取log日志,而后作进一步的处理。在这里咱们能够将数据的处理按照需求分为两个方向,线上和线下,也能够叫实时和离线。实时处理部分相似于网站里的站短,有消息了立刻就推送到前端,这是一种对实时性要求极高的模式,kafka能够作到,固然针对站短这样的功能还有更好的处理方式,我主要将kafka线上消费功能用在了实时统计上,处理一些如实时流量汇总、各系统实时吞吐量汇总等。 ios
这种应用,通常采用一个consumer中的一个group对应一个业务,配合多个producer提供数据,以下图模
式: shell
采用这种方式处理很简单,采用官网上给的例子便可解决,只是因为版本的问题,代码稍做更改便可: api
package com.a2.kafka.consumer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.Message; import kafka.message.MessageAndMetadata; public class CommonConsumer { public static void main(String[] args) { // specify some consumer properties Properties props = new Properties(); props.put("zk.connect", "192.168.181.128:2181"); props.put("zk.connectiontimeout.ms", "1000000"); props.put("groupid", "test_group"); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> map=new HashMap<String,Integer>(); map.put("test", 2); // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(map); List<KafkaStream<Message>> streams = topicMessageStreams.get("test"); // create list of 4 threads to consume from each of the partitions ExecutorService executor = Executors.newFixedThreadPool(4); // consume the messages in the threads for(final KafkaStream<Message> stream: streams) { executor.submit(new Runnable() { public void run() { for(MessageAndMetadata<Message> msgAndMetadata: stream) { // process message (msgAndMetadata.message()) System.out.println(msgAndMetadata.message()); } } }); } } }
这是一个user level的API,还有一个low level的API能够从官网找到,这里就不贴出来了。这个consumer是底层采用的是一个阻塞队列,只要一有producer生产数据,那consumer就会将数据打印出来,这是否是十分符合实时性的要求。 网络
固然这里会产生一个很严重的问题,若是你重启一下上面这个程序,那你连一条数据都抓不到,可是你去log文件中明明能够看到全部数据都好好的存在。换句话说,一旦你消费过这些数据,那你就没法再次用同一个groupid消费同一组数据了。我已经把结论说出来了,要消费同一组数据,你能够采用不一样的group。 性能
简单说下产生这个问题的缘由,这个问题相似于transaction commit,在消息系统中都会有这样一个问题存在,数据消费状态这个信息到底存哪里。是存在consumer端,仍是存在broker端。对于这样的争论,通常会出现三种状况:
第一种状况是将消费的状态存储在了broker端,一旦消费了就改变状态,但会由于网络缘由少消费信息,第二种是存在两端,而且先在broker端将状态记为send,等consumer处理完以后将状态标记为consumed,但也有可能由于在处理消息时产生异常,致使状态标记错误等,而且会产生性能的问题。第三种固然是最好的结果。 fetch
Kafka解决这个问题采用high water mark这样的标记,也就是设置offset: 网站
Kafka does two unusual things with respect to metadata. First the stream is partitioned on the brokers into a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order. This means that rather than store metadata for each message (marking it as consumed, say), we just need to store the "high water mark" for each combination of consumer, topic, and partition. Hence the total metadata required to summarize the state of the consumer is actually quite small. In Kafka we refer to this high-water mark as "the offset" for reasons that will become clear in the implementation section.
因此在每次消费信息时,log4j中都会输出不一样的offset: ui
[FetchRunnable-0] INFO : kafka.consumer.FetcherRunnable#info : FetchRunnable-0 start fetching topic: test part: 0 offset: 0 from 192.168.181.128:9092 [FetchRunnable-0] INFO : kafka.consumer.FetcherRunnable#info : FetchRunnable-0 start fetching topic: test part: 0 offset: 15 from 192.168.181.128:9092除了采用不一样的groupid去抓取已经消费过的数据,kafka还提供了另外一种思路,这种方式更适合线下的操做,镜像。
经过一些配置,就能够将线上产生的数据同步到镜像中去,而后再由特定的集群区处理大批量的数据,这种方式能够采用low level的API按照不一样的partition和offset来抓取数据,以得到更好的并行处理效果。