请使用0.9之后的版本:apache
Properties props = new Properties(); props.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset","earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); try{ while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }finally{ consumer.close(); }
一、只须要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必须指定);bootstrap
二、用这些Properties构建consumer对象(KafkaConsumer还有其余构造,能够把序列化传进去);数组
三、subscribe订阅topic列表(能够用正则订阅Pattern.compile("kafka.*")session
使用正则必须指定一个listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 能够重写这个接口来实现 分区变动时的逻辑。若是设置了enable.auto.commit = true 就不用理会这个逻辑。fetch
四、而后循环poll消息(这里的1000是超时设定,若是没有不少数据,也就等一秒);code
五、处理消息(打印了offset key value 这里写处理逻辑)。server
六、关闭KafkaConsumer(能够传一个timeout值 等待秒数 默认是30)。对象
bootstrap.server(最好用主机名不用ip kafka内部用的主机名 除非本身配置了ip)接口
deserializer 反序列化consumer从broker端获取的是字节数组,还原回对象类型。ip
默认有十几种:StringDeserializer LongDeserializer DoubleDeserializer。。
也能够自定义:定义serializer格式 建立自定义deserializer类实现Deserializer 接口 重写逻辑
除了四个必传的 bootstrap.server group.id key.deserializer value.deserializer
还有session.timeout.ms "coordinator检测失败的时间"
是检测consumer挂掉的时间 为了能够及时的rebalance 默认是10秒 能够设置更小的值避免消息延迟。
max.poll.interval.ms "consumer处理逻辑最大时间"
处理逻辑比较复杂的时候 能够设置这个值 避免形成没必要要的 rebalance ,由于两次poll时间超过了这个参数,kafka认为这个consumer已经跟不上了,会踢出组,并且不能提交offset,就会重复消费。默认是5分钟。
auto.offset.reset "无位移或者位移越界时kafka的应对策略"
因此若是启动了一个group从头消费 成功提交位移后 重启后仍是接着消费 这个参数无效
因此3个值的解释是:
earliset 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从最先的位移消费
latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
(注意kafka-0.10.1.X版本以前: auto.offset.reset 的值为smallest,和,largest.(offest保存在zk中) 、
咱们这是说的是新版本:kafka-0.10.1.X版本以后: auto.offset.reset 的值更改成:earliest,latest,和none (offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面))
enable.auto.commit 是否自动提交位移
true 自动提交 false须要用户手动提交 有只处理一次须要的 最近设置为false本身控制。
fetch.max.bytes consumer单次获取最大字节数
max.poll.records 单次poll返回的最大消息数
默认500条 若是消费很轻量 能够适当提升这个值 增长消费速度。
hearbeat.interval.ms consumer其余组员感知rabalance的时间
该值必须小于 session.timeout.ms 若是检测到 consumer挂掉 也就根本没法感知rabalance了
connections.max.idle.ms 按期关闭链接的时间
默认是9分钟 能够设置为-1 永不关闭
更多实时计算,Kafka等相关技术博文,欢迎关注实时流式计算