转自 https://blog.csdn.net/qq_18581221/article/details/89766073node
在使用kafka时,大多数场景对于数据少许的不一致(重复或者丢失)并不关注,好比日志,由于不会影响最终的使用或者分析,可是在某些应用场景(好比业务数据),须要对任何一条消息都要作到精确一次的消费,才能保证系统的正确性,kafka并不提供准确一致的消费API,须要咱们在实际使用时借用外部的一些手段来保证消费的精确性,下面咱们介绍如何实现sql
这篇文章KafkaConsumer使用介绍、参数配置介绍了如何kafka具备两种提交offset(消费偏移量)方式,咱们在Kafka简介以及安装和使用可知每一个分区具有一offset记录消费位置,若是消费者一直处于正常的运行转态,那么offset将没有什么用处,由于正常消费时,consumer记录了本次消费的offset和下一次将要进行poll数据的offset起始位置,可是若是消费者发生崩溃或者有新的消费者加入消费者组,就会触发再均衡Rebalance,Rebalance以后,每一个消费者将会分配到新的分区,而消费者对于新的分区应该从哪里进行起始消费,这时候提交的offset信息就起做用了,提交的offset信息包括消费者组全部分区的消费进度,这时候消费者能够根据消费进度继续消费,提交offset提交自动提交是最不具肯定性的,因此要使用手动提交来控制offset数据库
/** * 手动提交offset * 实现至少一次的消费语义 at least once * 当手动提交位移失败,会重复消费数据 */ @Test public void testCommitOffset() { String topic = "first-topic"; String group = "g1"; Properties props = new Properties(); props.put("bootstrap.servers", "node00:9092,node03:9092"); //required props.put("group.id", group); //required props.put("enable.auto.commit", "false"); // 关闭自动提交 props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "latest"); //从最先的消息开始读取 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //required props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //required Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); //订阅topic final int minBatchSize = 10; // 缓存 List<ConsumerRecord<String, String>> buffer = new ArrayList<>(minBatchSize); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); records.forEach(buffer::add); // 缓存满了才对数据进行处理 if (buffer.size() >= minBatchSize) { // 业务逻辑--插入数据库 // insertIntoDb(buffer); // 等数据插入数据库以后,再异步提交位移 // 经过异步的方式提交位移 consumer.commitAsync(((offsets, exception) -> { if (exception == null) { offsets.forEach((topicPartition, metadata) -> { System.out.println(topicPartition + " -> offset=" + metadata.offset()); }); } else { exception.printStackTrace(); // 若是出错了,同步提交位移 consumer.commitSync(offsets); } })); // 若是提交位移失败了,那么重启consumer后会重复消费以前的数据,再次插入到数据库中 // 清空缓冲区 buffer.clear(); } } } finally { consumer.close(); } }
代码实现apache
/** * 实现最多一次语义 * 在消费前提交位移,当后续业务出现异常时,可能丢失数据 */ @Test public void testAtMostOnce() { Properties props = new Properties(); props.put("enable.auto.commit", "false"); KafkaConsumer<String, String> kafkaConsumer = KafkaFactory.buildConsumer(props); kafkaConsumer.subscribe(Arrays.asList("first-topic")); try { while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(500); // 处理业务以前就提交位移 kafkaConsumer.commitAsync(); // 下面是业务逻辑 records.forEach(record -> { System.out.println(record.value() + ", offset=" + record.offset()); }); } } catch (Exception e) { } finally { kafkaConsumer.close(); } }
从kafka的消费机制,咱们能够获得是否可以精确的消费关键在消费进度信息的准确性,若是可以保证消费进度的准确性,也就保证了消费数据的准确性bootstrap
这里简单说明一下实现思路
1) 利用consumer api的seek方法能够指定offset进行消费,在启动消费者时查询数据库中记录的offset信息,若是是第一次启动,那么数据库中将没有offset信息,须要进行消费的元数据插入,而后从offset=0开始消费api
2) 关系型数据库具有事务的特性,当数据入库时,同时也将offset信息更新,借用关系型数据库事务的特性保证数据入库和修改offset记录这两个操做是在同一个事务中进行缓存
3) 使用ConsumerRebalanceListener来完成在分配分区时和Relalance时做出相应的处理逻辑异步
4) 要弄清楚的是,咱们在消费的时候,关闭了自动提交,咱们也没有经过consumer.commitAsync()手动提交咱们的位移信息,而是在每次启动一个新的consumer的时候,触发rebalance时,读取数据库中的位移信息,从该位移中开始读取partition的信息(初始化的时候为0),在没有出现异常的状况下,咱们的consumer会不断从producer读取信息,这个位移是最新的那个消息位移,并且会同时把这个位移更新到数据库中,可是,当出现了rebalance时,那么consumer就会从数据库中读取开始的位移。ide
表设计ui
create table kafka_info( topic_group_partition varchar(32) primary key, //主题+组名+分区号 这里冗余设计方便经过这个主键进行更新提高效率 topic_group varchar(30), //主题和组名 partition_num tinyint,//分区号 offsets bigint default 0 //offset信息 );
代码
/** * @Description: 实现Kafka的精确一次消费 * @author: HuangYn * @date: 2019/10/15 21:10 */ public class ExactlyOnceConsume { private final KafkaConsumer<String, String> consumer; private Map<TopicPartition, Long> tpOffsetMap; private List<ConsumerRecord> list; private JDBCHelper jdbcHelper = JDBCHelper.getInstance(); private String groupId; private String topic; public ExactlyOnceConsume(Properties props, String topic, String groupId) { this.consumer = KafkaFactory.buildConsumer(props); this.list = new ArrayList<>(100); this.tpOffsetMap = new HashMap<>(); this.groupId = groupId; this.topic = topic; this.consumer.subscribe(Arrays.asList(this.topic), new HandleRebalance()); } public void receiveMsg() { try { while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); if (!records.isEmpty()) { // 处理每一个partition的记录 records.partitions().forEach(tp -> { List<ConsumerRecord<String, String>> tpRecords = records.records(tp); // 记录加到缓存中 tpRecords.forEach(record -> { System.out.println("partition=" + record.partition() + ", offset= " + record.offset() + ", value=" + record.value()); list.add(record); }); // 将partition对应的offset加到map中, 获取partition中最后一个元素的offset, // +1 就是下一次读取的位移,就是本次须要提交的位移 tpOffsetMap.put(tp, tpRecords.get(tpRecords.size() - 1).offset() + 1); }); } // 缓存中有数据 if (!list.isEmpty()) { // 将数据插入数据库,而且将位移信息也插入数据库 // 所以,每次读取到数据,都要更新本consumer在数据库中的位移信息 boolean success = insertIntoDB(list, tpOffsetMap); if (success) { list.clear(); tpOffsetMap.clear(); } } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } private boolean insertIntoDB(List<ConsumerRecord> list, Map<TopicPartition, Long> tpOffsetMap) { // 这里应该是在同一个事务中进行的 // 为了方便就省略了 try { // TODO 将数据入库,这里省略了 // 将partition位移更新 String sql = "UPDATE kafka_info SET offsets = ? WHERE topic_group_partition = ?"; List<Object[]> params = new ArrayList<>(tpOffsetMap.size()); tpOffsetMap.forEach((tp, offset) -> { Object[] param = new Object[]{offset, topic + "_" + groupId + "_" + tp.partition()}; params.add(param); }); jdbcHelper.batchExecute(sql, params); return true; } catch (Exception e) { // 回滚事务 } } /** * rebalance触发的处理器 */ private class HandleRebalance implements ConsumerRebalanceListener { // rebalance以前触发 @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { //发生Rebalance时,只须要将list中数据和记录offset信息清空便可 //这里为何要清除数据,应为在Rebalance的时候有可能还有一批缓存数据在内存中没有进行入库, //而且offset信息也没有更新,若是不清除,那么下一次还会从新poll一次这些数据,将会致使数据重复 System.out.println("==== onPartitionsRevoked ===== "); list.clear(); tpOffsetMap.clear(); } // rebalance后调用,consumer抓取数据以前触发 @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.println("== onPartitionsAssigned =="); List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); // 从数据库读取当前partition的信息 Map<TopicPartition, Long> partitionOffsetMapFromDB = getPartitionOffsetMapFromDB(partitionInfos.size()); // 在分配分区时指定消费位置 for (TopicPartition partition : partitions) { // 指定consumer在每一个partition上的消费开始位置 // 若是在数据库中有对应partition的信息则使用,不然将默认从offset=0开始消费 if (partitionOffsetMapFromDB.get(partition) != null) { consumer.seek(partition, partitionOffsetMapFromDB.get(partition)); } else { consumer.seek(partition, 0L); } } } } /** * 从数据库读取offset信息 * * @param size * @return */ private Map<TopicPartition, Long> getPartitionOffsetMapFromDB(int size) { Map<TopicPartition, Long> partitionOffsetMapFromDB = new HashMap<>(); String sql = "SELECT partition_num, offsets FROM kafka_info WHERE topic_group = ?"; jdbcHelper.executeQuery(sql, new Object[]{topic + "_" + groupId}, resultSet -> { int partition_num = -1; long offsets = -1; while (resultSet.next()) { partition_num = resultSet.getInt("partition_num"); offsets = resultSet.getLong("offsets"); System.out.println("partition_num=" + partition_num + ", offset=" + offsets); partitionOffsetMapFromDB.put(new TopicPartition(topic, partition_num), offsets); } System.out.println("partitionOffsetMapFromDB.size = " + partitionOffsetMapFromDB.size()); //判断数据库是否存在全部的分区的信息,若是没有,则须要进行初始化 if (partitionOffsetMapFromDB.size() < size) { String insert = "INSERT INTO kafka_info (topic_group_partition,topic_group,partition_num) VALUES(?,?,?)"; List<Object[]> params = new ArrayList<>(); for (int p_num = 0; p_num < size; p_num++) { Object[] param = new Object[]{ topic + "_" + groupId + "_" + p_num, topic + "_" + groupId, p_num }; params.add(param); } jdbcHelper.batchExecute(insert, params); } }); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return partitionOffsetMapFromDB; } }
数据库中记录