Kafka中produer发送消息回调超时错误

Kafka版本0.10.1.1html

producer发送消息后出现以下错误消息:java

 The producer has a error:Expiring 1 record(s) for testtopic-0 due to 30039 ms has passed since last append
The producer has a error:Expiring 1 record(s) for testtopic-0 due to 30039 ms has passed since last appendapache

修改timeout.ms,batch.size等均无效,仍然出错。编程

删掉topic及数据,从新用命令行建立topic后,问题解决。json

缘由:producer发消息到具体topic的时候,若是没有该topic,kafka会自动建立topic,但可能会出现上面的错误。因此topic必定要命令行建立,而后再使用。bootstrap

 

参考以下文章:api

实践代码采用kafka-clients V0.10.0.0 编写(http://m.blog.csdn.net/article/details?id=51577117)服务器

1、编写producer

第一步:使用./kafka-topics.sh 命令建立topic及partitions 分区数session

./kafka-topics.sh --create--zookepper "172.16.49.173:2181" --topic "producer_test" --partitions 10 replication-factor 3

第二步:实现org.apache.kafka.clients.producer.Partitioner 分区接口,以实现自定义的消息分区并发

import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyPartition implements Partitioner {
    private static Logger LOG = LoggerFactory.getLogger(MyPartition.class);
    public MyPartition() {
        // TODO Auto-generated constructor stub
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // TODO Auto-generated method stub

    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // TODO Auto-generated method stub
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int partitionNum = 0;
        try {
            partitionNum = Integer.parseInt((String) key);
        } catch (Exception e) {
            partitionNum = key.hashCode() ;
        }
        LOG.info("the message sendTo topic:"+ topic+" and the partitionNum:"+ partitionNum);
        return Math.abs(partitionNum  % numPartitions);
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

}

第三步:编写 producer

import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionTest {
    private static Logger LOG = LoggerFactory.getLogger(PartitionTest.class);

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.16.49.173:9092;172.16.49.173:9093");

        props.put("retries", 0);
        // props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        // props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("partitioner.class", "com.goodix.kafka.MyPartition");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("producer_test", "2223132132",
                "test23_60");
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception e) {
                // TODO Auto-generated method stub
                if (e != null)
                    LOG.error("the producer has a error:" + e.getMessage());
                else {
                    LOG.info("The offset of the record we just sent is: " + metadata.offset());
                    LOG.info("The partition of the record we just sent is: " + metadata.partition());
                }

            }
        });
        try {
            Thread.sleep(1000);
            producer.close();
        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

    }

}

备注: 要先用命令建立topic及partitions 分区数;不然在自定义的分区中若是有大于1的状况下,发送数据消息到kafka时会报expired due to timeout while requesting metadata from brokers错误

2、使用Old Consumer High Level API编写consumer

第一步:编写具体处理消息的类

import java.io.UnsupportedEncodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;


public class Consumerwork implements Runnable {
    private static Logger LOG = LoggerFactory.getLogger(Consumerwork.class);
     @SuppressWarnings("rawtypes")
    private KafkaStream m_stream;
     private int m_threadNumber;
     @SuppressWarnings("rawtypes")
    public Consumerwork(KafkaStream a_stream,int a_threadNumber) {
        // TODO Auto-generated constructor stub
         m_threadNumber = a_threadNumber;
         m_stream = a_stream;
    }

    @SuppressWarnings("unchecked")
    @Override
    public void run() {
        // TODO Auto-generated method stub
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
           while (it.hasNext())
                try {
                    MessageAndMetadata<byte[], byte[]> thisMetadata=it.next();
                    String jsonStr = new String(thisMetadata.message(),"utf-8") ;
                    LOG.info("Thread " + m_threadNumber + ": " +jsonStr);
                    LOG.info("partion"+thisMetadata.partition()+",offset:"+thisMetadata.offset());
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                } catch (UnsupportedEncodingException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
    }
}

第二步:编写启动Consumer主类

mport java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConsumerGroup {
    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;
    private static Logger LOG = LoggerFactory.getLogger(ConsumerGroup.class);
    public ConsumerGroup(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }
    public static void main(String[] args) {
        Scanner sc = new Scanner(System.in);
        System.out.println("请输入zookeeper集群地址(如zk1:2181,zk2:2181,zk3:2181):");
        String zooKeeper = sc.nextLine(); 
        System.out.println("请输入指定的消费group名称:");
        String groupId = sc.nextLine(); 
        System.out.println("请输入指定的消费topic名称:");
        String topic = sc.nextLine(); 
        System.out.println("请输入指定的消费处理线程数:");
        int threads = sc.nextInt();
        LOG.info("Starting consumer kafka messages with zk:" + zooKeeper + " and the topic is " + topic);
        ConsumerGroup example = new ConsumerGroup(zooKeeper, groupId, topic);
        example.run(threads);

        try {
            Thread.sleep(1000);
        } catch (InterruptedException ie) {

        }
        // example.shutdown();
    }

    private void shutdown() {
        // TODO Auto-generated method stub
        if (consumer != null)
            consumer.shutdown();
        if (executor != null)
            executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            LOG.info("Interrupted during shutdown, exiting uncleanly");
        }
    }

    private void run(int a_numThreads) {
        // TODO Auto-generated method stub
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        //
        int threadNumber = 0;
        LOG.info("the streams size is "+streams.size());
        for (final KafkaStream stream : streams) {
            executor.submit(new com.goodix.kafka.oldconsumer.Consumerwork(stream, threadNumber));
    //      consumer.commitOffsets();
            threadNumber++;
        }

    }

    private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "60000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
//      props.put("rebalance.max.retries", "5");
//      props.put("rebalance.backoff.ms", "15000");
        return new ConsumerConfig(props);
    }

}

1. topicCountMap.put(topic, new Integer(a_numThreads)) 是告诉Kafka我有多少个线程来处理消息。

(1). 这个线程数必须是小等于topic的partition分区数;能够经过./kafka-topics.sh --describe --zookeeper "172.16.49.173:2181" --topic "producer_test"命令来查看分区的状况 
(2). kafka会根据partition.assignment.strategy指定的分配策略来指定线程消费那些分区的消息;这里没有单独配置该项便是采用的默认值range策略(按照阶段平均分配)。好比分区有10个、线程数有3个,则线程 1消费0,1,2,3,线程2消费4,5,6,线程3消费7,8,9。另一种是roundrobin(循环分配策略),官方文档中写有使用该策略有两个前提条件的,因此通常不要去设定。 
(3). 通过测试:consumerMap.get(topic).size(),应该是得到的目前该topic有数据的分区数 
(4). stream即指的是来自一个或多个服务器上的一个或者多个partition的消息。每个stream都对应一个单线程处理。所以,client可以设置知足本身需求的stream数目。总之,一个stream也许表明了多个服务器partion的消息的聚合,可是每个 partition都只能到一个stream

2. Executors.newFixedThreadPool(a_numThreads)是建立一个建立固定容量大小的缓冲池:每次提交一个任务就建立一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,若是某个线程由于执行异常而结束,那么线程池会补充一个新线程。

3. props.put(“auto.offset.reset”, “smallest”) 是指定从最小没有被消费offset开始;若是没有指定该项则是默认的为largest,这样的话该consumer就得不到生产者先产生的消息。

4. 要使用old consumer API须要引用kafka_2.11以及kafka-clients。

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>

3、使用Old SimpleConsumerAPI编写consumer

这是一个更加底层和复杂的API

使用的场景

因为使用该API须要本身控制的项比较多,也比较复杂,官方给出了一些合适的适用场景,也能够理解成为这些场景是High Level Consumer API 不可以作到的

1. 针对一个消息读取屡次
2. 在一个process中,仅仅处理一个topic中的一个partitions
3. 使用事务,确保每一个消息只被处理一次

须要处理的事情

1. 必须在程序中跟踪offset值
2. 必须找出指定Topic Partition中的lead broker
3. 必须处理broker的变更

使用SimpleConsumer的步骤

首先,你必须知道读哪一个topic的哪一个partition
而后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
再者,本身去写request并fetch数据
最终,还要注意须要识别和处理broker leader的改变

示例

package com.goodix.kafka.oldconsumer;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleExample {
    private static Logger LOG = LoggerFactory.getLogger(SimpleExample.class);
    public static void main(String args[]) {
        SimpleExample example = new SimpleExample();
        Scanner sc = new Scanner(System.in);
        System.out.println("请输入broker节点的ip地址(如172.16.49.173)");
        String brokerIp = sc.nextLine(); 
        List<String> seeds = new ArrayList<String>();
        seeds.add(brokerIp);
        System.out.println("请输入broker节点端口号(如9092)");
        int port = Integer.parseInt( sc.nextLine());
        System.out.println("请输入要订阅的topic名称(如test)");
        String topic = sc.nextLine();
        System.out.println("请输入要订阅要查找的分区(如0)");
        int partition = Integer.parseInt( sc.nextLine());
        System.out.println("请输入最大读取消息数量(如10000)");
        long maxReads = Long.parseLong( sc.nextLine());

        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            LOG.error("Oops:" + e);
             e.printStackTrace();
        }
    }

    private List<String> m_replicaBrokers = new ArrayList<String>();

    public SimpleExample() {
        m_replicaBrokers = new ArrayList<String>();
    }

    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        // find the meta data about the topic and partition we are interested in
        //获取指定Topic partition的元数据  
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            LOG.error("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            LOG.error("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;
        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder()
                    .clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                    .build();
            FetchResponse fetchResponse = consumer.fetch(req);

            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                LOG.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5) break;
                if (code == ErrorMapping.OffsetOutOfRangeCode())  {
                    // We asked for an invalid offset. For simple case ask for the last element to reset
                    readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;

            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    LOG.error("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                LOG.info("the messag's offset is :"+String.valueOf(messageAndOffset.offset()) + " and the value is :" + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null) consumer.close();
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            LOG.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }
    /**
     * 找一个leader broker
     * 遍历每一个broker,取出该topic的metadata,而后再遍历其中的每一个partition metadata,若是找到咱们要找的partition就返回
     * 根据返回的PartitionMetadata.leader().host()找到leader broker
     * @param a_oldLeader
     * @param a_topic
     * @param a_partition
     * @param a_port
     * @return
     * @throws Exception
     */
    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give ZooKeeper a second to recover
                // second time, assume the broker did recover before failover, or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        LOG.error("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }
    /**
     * 
     * @param a_seedBrokers
     * @param a_port
     * @param a_topic
     * @param a_partition
     * @return
     */
    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;

        loop:
        for (String seed : a_seedBrokers) { //遍历每一个broker 
            SimpleConsumer consumer = null;
            try {
                // 建立Simple Consumer,
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                //发送TopicMetadata Request请求
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
                //取到Topic的Metadata 
                List<TopicMetadata> metaData = resp.topicsMetadata();
                //遍历每一个partition的metadata
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        // 判断是不是要找的partition
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            //找到就返回
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                LOG.info("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
                        + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.BrokerEndPoint replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}

4、 使用NewConsumer API

(一)、自动提交offset偏移量

Properties props = new Properties();
//brokerServer(kafka)ip地址,不须要把全部集群中的地址都写上,但是一个或一部分
props.put("bootstrap.servers", "172.16.49.173:9092");
//设置consumer group name,必须设置
props.put("group.id", a_groupId);
//设置自动提交偏移量(offset),由auto.commit.interval.ms控制提交频率
props.put("enable.auto.commit", "true");
//偏移量(offset)提交频率
props.put("auto.commit.interval.ms", "1000");
//设置使用最开始的offset偏移量为该group.id的最先。若是不设置,则会是latest即该topic最新一个消息的offset
//若是采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//设置心跳时间
props.put("session.timeout.ms", "30000");
//设置key以及value的解析(反序列)类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅topic
consumer.subscribe(Arrays.asList("topic_test"));
while (true) {
    //每次取100条信息
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
 }

须要注意的:

group.id :必须设置 
auto.offset.reset:若是想得到消费者启动前生产者生产的消息,则必须设置为earliest;若是只须要得到消费者启动后生产者生产的消息,则不须要设置该项 
enable.auto.commit(默认值为true):若是使用手动commit offset则须要设置为false,并再适当的地方调用consumer.commitSync(),不然每次启动消费折后都会从头开始消费信息(在auto.offset.reset=earliest的状况下);

(二)、 本身控制偏移量提交

不少时候,咱们是但愿在得到消息并通过一些逻辑处理后,才认为该消息已被消费,这能够经过本身控制偏移量提交来实现。

示例1:批量提交偏移量

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



/**
 * 手动批量提交偏移量
 * @author lxh
 *
 */
public class ManualOffsetConsumer {
    private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class);
    public ManualOffsetConsumer() {
        // TODO Auto-generated constructor stub
    }

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        //props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093");
        //设置brokerServer(kafka)ip地址
        props.put("bootstrap.servers", "172.16.49.173:9092");
        //设置consumer group name
        props.put("group.id","manual_g1");

        props.put("enable.auto.commit", "false");

        //设置使用最开始的offset偏移量为该group.id的最先。若是不设置,则会是latest即该topic最新一个消息的offset
        //若是采用latest,消费者只能得道其启动后,生产者生产的消息
        props.put("auto.offset.reset", "earliest");
        //
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
        consumer.subscribe(Arrays.asList("producer_test"));
        final int minBatchSize = 5;  //批量提交数量
         List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records) {
                 LOG.info("consumer message values is "+record.value()+" and the offset is "+ record.offset());
                 buffer.add(record);
             }
             if (buffer.size() >= minBatchSize) {
                 LOG.info("now commit offset");
                 consumer.commitSync();
                 buffer.clear();
             }
         }
    }

}

示例2:消费完一个分区后手动提交偏移量

package com.goodix.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 消费完一个分区后手动提交偏移量
 * @author lxh
 *
 */
public class ManualCommitPartion {
    private static Logger LOG = LoggerFactory.getLogger(ManualCommitPartion.class);
    public ManualCommitPartion() {
        // TODO Auto-generated constructor stub
    }

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        //props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093");
        //设置brokerServer(kafka)ip地址
        props.put("bootstrap.servers", "172.16.49.173:9092");
        //设置consumer group name
        props.put("group.id","manual_g2");

        props.put("enable.auto.commit", "false");

        //设置使用最开始的offset偏移量为该group.id的最先。若是不设置,则会是latest即该topic最新一个消息的offset
        //若是采用latest,消费者只能得道其启动后,生产者生产的消息
        props.put("auto.offset.reset", "earliest");
        //
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
        consumer.subscribe(Arrays.asList("producer_test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    LOG.info("now consumer the message it's offset is :"+record.offset() + " and the value is :" + record.value());
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                LOG.info("now commit the partition[ "+partition.partition()+"] offset");
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
        }
    }

}

(三)、指定消费某个分区的消息

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 消费指定分区的消息
 * @author lxh
 *
 */
public class ManualPartion {
    private static Logger LOG = LoggerFactory.getLogger(ManualPartion.class);
    public ManualPartion() {
        // TODO Auto-generated constructor stub
    }

    public static void main(String[] args) {
        Properties props = new Properties();
        //设置brokerServer(kafka)ip地址
        props.put("bootstrap.servers", "172.16.49.173:9092");
        //设置consumer group name
        props.put("group.id", "manual_g4");
        //设置自动提交偏移量(offset),由auto.commit.interval.ms控制提交频率
        props.put("enable.auto.commit", "true");
        //偏移量(offset)提交频率
        props.put("auto.commit.interval.ms", "1000");
        //设置使用最开始的offset偏移量为该group.id的最先。若是不设置,则会是latest即该topic最新一个消息的offset
        //若是采用latest,消费者只能得道其启动后,生产者生产的消息
        props.put("auto.offset.reset", "earliest");
        //
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        TopicPartition partition0 = new TopicPartition("producer_test", 0);
        TopicPartition partition1 = new TopicPartition("producer_test", 1);
        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
        consumer.assign(Arrays.asList(partition0, partition1));
        while (true) {
              ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
              for (ConsumerRecord<String, String> record : records)
                  System.out.printf("offset = %d, key = %s, value = %s  \r\n", record.offset(), record.key(), record.value());
              try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}

总结

使用newConsumer API 只须要引用kafka-clients便可 
newConsumer API 更加易懂、易用

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>

官方对于consumer与partition的建议

1. 若是consumer比partition多,是浪费,由于kafka的设计是在一个partition上是不容许并发的,因此consumer数不要大于partition数
2. 若是consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,不然会致使partition里面的数据被取的不均匀。最好partiton数目是consumer数目的整数倍,因此partition数目很重要,好比取24,就很容易设定consumer数目
3. 若是consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不一样
4. 增减consumer,broker,partition会致使rebalance,因此rebalance后consumer对应的partition会发生变化
5. High-level接口中获取不到数据的时候是会block的

参考文章: 
Java线程池使用说明 
Java并发编程:线程池的使用 
如何肯定Kafka的分区数、key和consumer线程数
Kafka Consumer接口

 

如何发送15MB的message到KAFKA里面,StackOverflow的答复:

Minor changes required for Kafka 0.10 and the new consumer compared to laughing_man's answer:

  • Broker: No changes, you still need to increase properties message.max.bytes and replica.fetch.max.bytesmessage.max.bytes has to be smaller than replica.fetch.max.bytes.
  • Producer: Increase max.request.size to send the larger message.
  • Consumer: Increase max.partition.fetch.bytes to receive larger messages.

5down vote

The idea is to have equal size of message being sent from Kafka Producer to Kafka Broker and then received by Kafka Consumer i.e.

Kafka producer --> Kafka Broker --> Kafka Consumer

Suppose if the requirement is to send 15MB of message, then the Producer, the Broker and the Consumer, all three, needs to be in sync.

Kafka Producer sends 15 MB --> Kafka Broker Allows/Stores 15 MB --> Kafka Consumer receives 15 MB

The setting therefore should be A.) On Broker: message.max.bytes=15728640 replica.fetch.max.bytes=15728640

B.) On Consumer: fetch.message.max.bytes=15728640

相关文章
相关标签/搜索