本示以同步至我的博客 liaosi's blog-Kafka的Java代码示例和配置说明,代码已上传至 个人GitHub
消费者和生产者使用的几个常量。html
package com.lzumetal.mq.kafka.demo; /** * <p>Description: </p> * * @author: liaosi * @date: 2018-01-30 */ public class Constants { final static String GROUP_ID = "test_group"; final static String MY_TOPIC = "myTest"; final static String KAFKA_SERVER_ADRESS = "192.168.128.1"; final static int KAFKA_SERVER_PORT = 9092; }
KafkaProducer(org.apache.kafka.clients.producer.KafkaProducer
)是一个用于向kafka集群发送数据的客户端。producer是线程安全的,多个线程能够共享同一个 producer实例,并且这一般比在多个线程中每一个线程建立一个实例速度要快些。参考KafkaProducer官方文档java
package com.lzumetal.mq.kafka.demo; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.Before; import org.junit.Test; import java.util.Properties; import java.util.concurrent.TimeUnit; /** * <p>Description: </p> * Producer由一个持有未发送消息记录的资源池和一个用来向Kafka集群发送消息记录的后台IO线程组成。 * 使用后未关闭producer将致使这些资源泄露。 * * @author: liaosi * @date: 2018-01-30 */ public class MyKafkaProducer { private Producer<String, String> producer; @Before public void init() { Properties props = new Properties(); props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT); /*ack 配置项用来控制producer要求leader确认多少消息后返回调用成功。 当值为0时producer不须要等待任何确认消息。 当值为1时只须要等待leader确认。 当值为-1或all时须要所有ISR集合返回确认才能够返回成功。 */ //props.put("acks", "all"); //当 retries > 0 时,若是发送失败,会自动尝试从新发送数据。发送次数为retries设置的值。 props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); //key.serializer 和 value.serializer 指定使用什么序列化方式将用户提供的key和value进行序列化。 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); } @Test public void produceMsg() { for (int i = 0; i < 10; i++) { String msg = "Message_test_" + i; System.out.println("produce : " + msg); //send方法是异步的。当它被调用时,它会将消息记录添加到待发送缓冲区并当即返回。 //使用这种方式可使生产者汇集一批消息记录后一块儿发送,从而提升效率。 producer.send(new ProducerRecord<>(Constants.MY_TOPIC, Integer.toString(i), msg)); sleep(1); } producer.close(); } private void sleep(int seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { e.printStackTrace(); } } }
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
Kafka集群链接的host/port组,格式:host1:port1,host2:port2,…
这些server仅仅是用于初始化的链接,以发现集群全部成员(成员可能会动态的变化),这个列表不须要包含全部的servers(数量尽可能不止一个,以防其中一个down机了)。git
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1.
acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.
acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.
producer须要server接收到数据以后发出的确认接收的信号,此项配置就是指procuder须要多少个这样的确认信号。此配置实际上表明了数据备份的可用性。如下设置为经常使用选项:
(1)acks=0: 设置为0表示producer不须要等待任何确认收到的信息。副本将当即加到socket buffer并认为已经发送。没有任何保障能够保证此种状况下server已经成功接收数据,同时重试配置不会发生做用(由于客户端不知道是否失败)回馈的offset会老是设置为-1;
(2)acks=1: 这意味着至少要等待leader已经成功将数据写入本地log,可是并无等待全部follower是否成功写入。这种状况下,若是follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
(3)acks=all: 这意味着leader须要等待全部备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。github
Message record 的key, value的序列化类。算法
The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.
This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.
producer能够用来缓存数据的内存大小。该值实际为RecordAccumulator类中的BufferPool,即Producer所管理的最大内存。但不是全部producer管理的内存都用做缓存,一些额外的内存会用于压缩(若是引入压缩机制),一样还有一些用于维护请求。
若是数据产生速度大于向broker发送的速度,producer会阻塞配置项max.block.ms
所设定的值,超出这个时间则抛出异常。apache
Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.
当设置 retries > 0 时,若是发送失败,会自动尝试从新发送数据,发送次数为retries设置的值。若是设定了retries但没有把max.in.flight.requests.per.connection 设置成 1则可能会改变数据的顺序,由于若是这两个batch都是发送到同一个partition,而且第一个batch发送失败而第二个发送成功,则第二个batch中的消息记录会比第一个的达到得早。
retries 的默认值是0。bootstrap
The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
No attempt will be made to batch records larger than this size.
Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.
A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.
为了改善客户端和Kafka集群的性能,减小请求次数,producer会把要发送到同一个partition的批量消息做为batch发送,batch.size 是用来设置batch的字节大小。若是 batch.size 过小则可能会下降吞吐量,设置太大则可能会致使浪费,由于咱们预先就须要腾出一个batch.size 大小的缓冲区来存贮将要发送达到该缓冲区的消息。
若将该值设为0,则不会进行批处理。缓存
The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.
producer会将request传输之间到达的全部records聚合到一个批请求。一般这个值发生在欠负载状况下,record到达速度快于发送。可是在某些场景下,client即便在正常负载下也指望减小请求数量。这个设置就是如此,经过人工添加少许时延,而不是立马发送一个record,producer会等待所给的时延,以让其余records发送出去,这样就会被聚合在一块儿。这个相似于TCP的Nagle算法。该设置给了batch的时延上限:当咱们得到一个partition的batch.size大小的records,就会当即发送出去,而无论该设置;可是若是对于这个partition没有累积到足够的record,会linger指定的时间等待更多的records出现。该设置的默认值为0(无时延)。例如,设置linger.ms=5,会减小request发送的数量,可是在无负载下会增长5ms的发送时延。安全
package com.lzumetal.mq.kafka.demo; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.junit.Test; import java.util.*; import java.util.concurrent.TimeUnit; /** * <p>Description: </p> * * @author: liaosi * @date: 2018-01-30 */ public class MyKafkaConsumer { /** * 自动提交offset */ @Test public void comsumeMsgAutoCommit() { Properties props = new Properties(); props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT); props.put("group.id", Constants.GROUP_ID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(Constants.MY_TOPIC)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } sleep(1); } } /** * 手动提交offset */ @Test public void consumerMsgManualCommit() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(Constants.MY_TOPIC)); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } } } private void insertIntoDb(List<ConsumerRecord<String, String>> buffer) { for (ConsumerRecord<String, String> record : buffer) { System.out.printf("insertIntoDb:offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } private void sleep(int seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { e.printStackTrace(); } } }
从上一次消费完毕后提交的offset处拉取数据,消费后提交offset有两种方式,手动和自动。网络
Consumer读取partition中的数据是经过调用发起一个fetch请求来执行的。而从Kafka Consumer来看,它有一个poll方法。可是这个poll方法只是可能会发起fetch请求。缘由是:Consumer每次发起fetch请求时,读取到的数据是有限制的,经过配置项max.partition.fetch.bytes
来限制。而在执行poll方法时,会根据配置项max.poll.records
来限制一次最多pool多少个record。
那么就可能出现这样的状况: 在知足max.partition.fetch.bytes
限制的状况下,假如fetch到了100个record,放到本地缓存后,因为max.poll.records
限制每次只能poll出15个record。那么KafkaConsumer就须要执行7次poll方法才能将这一次经过网络发起的fetch请求所fetch到的这100个record消费完毕。其中前6次是每次poll中15个record,最后一次是poll出10个record。
在consumer中,还有一个配置项:max.poll.interval.ms
,它表示最大的poll数据间隔,默认值是3秒。若是超过这个间隔没有发起pool请求,但heartbeat仍旧在发,就认为该consumer处于 livelock状态。就会将该consumer移出consumer group。因此为了避免使 Consumer 本身被移出,Consumer 应该不停的发起poll(timeout)操做。而这个动做 KafkaConsumer Client是不会帮咱们作的,这就须要本身在程序中不停的调用poll方法了。
当一个consumer因某种缘由退出Group时,进行从新分配partition后,同一group中的另外一个consumer在读取该partition时,怎么可以知道上一个consumer该从哪一个offset的message读取呢?也是是如何保证同一个group内的consumer不重复消费消息呢?上面说了一次走网络的fetch请求会拉取到必定量的数据,可是这些数据尚未被消息完毕,Consumer就挂掉了,下一次进行数据fetch时,是否会从上次读到的数据开始读取,而致使Consumer消费的数据丢失吗?
为了作到这一点,当使用完poll从本地缓存拉取到数据以后,须要client调用commitSync方法(或者commitAsync方法)去commit 下一次该去读取 哪个offset的message。
而这个commit方法会经过走网络的commit请求将offset在coordinator中保留,这样就可以保证下一次读取(不论进行了rebalance)时,既不会重复消费消息,也不会遗漏消息。
对于offset的commit,Kafka Consumer Java Client支持两种模式:由Kafka Consumer自动提交,或者是用户经过调用commitSync、commitAsync方法的方式手动完成offset的提交。
参考生产者中的bootstrap.servers配置说明
Message record 的key, value的反序列化类。
A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.
用于表示该consumer想要加入到哪一个group中。默认值是 “”。
The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.
心跳间隔。心跳是在consumer与coordinator之间进行的。心跳用来保持consumer的会话,而且在有consumer加入或者离开group时帮助进行rebalance。
这个值必须设置的小于session.timeout.ms,由于:当Consumer因为某种缘由不能发Heartbeat到coordinator时,而且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。
一般设置的值要低于session.timeout.ms的1/3。默认值是:3000 (3s)
The timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.
Consumer session 过时时间。consumer会发送周期性的心跳代表该consumer是活着的。若是超过session.timeout.ms设定的值仍然没有收到心跳,zebroker会把这个consumer从group中移除,而且从新rebalance。
这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
其默认值是:10000 (10 s)
If true the consumer's offset will be periodically committed in the background.
设置Consumer 在 commit 方式是不是自动调焦。默认值是true。
The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true.
自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(若是一个record被删除,就确定不存在了)时,该如何处理。它有4种处理方式:
默认值是latest。
The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.
Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。
The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.
当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。若是broker中数据量不够的话会wait,直到数据大小知足这个条件。
取值范围是:[0, Integer.Max],默认值是1。默认值设置为1的目的是:使得consumer的请求可以尽快的返回。
The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel.
一次fetch请求,从一个broker中取得的records最大大小。若是在从topic中第一个非空的partition取消息时,若是取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片状况下,只会返回这一条record。broker、topic都会对producer发给它的message size作限制。因此在配置这值时,能够参考broker的message.max.bytes
和topic的max.message.bytes
的配置。
取值范围是:[0, Integer.Max],默认值是:52428800 (5 MB)
一次fetch请求,从一个partition中取得的records最大大小。若是在从topic中第一个非空的partition取消息时,若是取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片状况下,只会返回这一条record。 broker、topic都会对producer发给它的message size作限制。因此在配置这值时,能够参考broker 的message.max.bytes
和 topic 的max.message.bytes
的配置。
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
前面说过要求程序中不间断的调用poll()。若是长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了。
The maximum number of records returned in a single call to poll().
Consumer每次调用poll()时取到的records的最大数。