首先介绍一下 Kafka 生产者发送消息的过程:java
本项目采用 Maven 构建,想要调用 Kafka 生产者 API,须要导入 kafka-clients
依赖,以下:git
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.0</version> </dependency>
建立 Kafka 生产者时,如下三个属性是必须指定的:github
建立的示例代码以下:算法
public class SimpleProducer { public static void main(String[] args) { String topicName = "Hello-Kafka"; Properties props = new Properties(); props.put("bootstrap.servers", "hadoop001:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /*建立生产者*/ Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "hello" + i, "world" + i); /* 发送消息*/ producer.send(record); } /*关闭生产者*/ producer.close(); } }
本篇文章的全部示例代码能够从 Github 上进行下载:kafka-basisshell
Kafka 的运行依赖于 zookeeper,须要预先启动,能够启动 Kafka 内置的 zookeeper,也能够启动本身安装的:apache
# zookeeper启动命令 bin/zkServer.sh start # 内置zookeeper启动命令 bin/zookeeper-server-start.sh config/zookeeper.properties
启动单节点 kafka 用于测试:bootstrap
# bin/kafka-server-start.sh config/server.properties
# 建立用于测试主题 bin/kafka-topics.sh --create \ --bootstrap-server hadoop001:9092 \ --replication-factor 1 --partitions 1 \ --topic Hello-Kafka # 查看全部主题 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
启动一个控制台消费者用于观察写入状况,启动命令以下:api
# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning
此时能够看到消费者控制台,输出以下,这里 kafka-console-consumer
只会打印出值信息,不会打印出键信息。数组
在这里可能出现的一个问题是:生产者程序在启动后,一直处于等待状态。这一般出如今你使用默认配置启动 Kafka 的状况下,此时须要对 server.properties
文件中的 listeners
配置进行更改:服务器
# hadoop001 为我启动kafka服务的主机名,你能够换成本身的主机名或者ip地址 listeners=PLAINTEXT://hadoop001:9092
上面的示例程序调用了 send
方法发送消息后没有作任何操做,在这种状况下,咱们没有办法知道消息发送的结果。想要知道消息发送的结果,可使用同步发送或者异步发送来实现。
在调用 send
方法后能够接着调用 get()
方法,send
方法的返回值是一个 Future<RecordMetadata>对象,RecordMetadata 里面包含了发送消息的主题、分区、偏移量等信息。改写后的代码以下:
for (int i = 0; i < 10; i++) { try { ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i); /*同步发送消息*/ RecordMetadata metadata = producer.send(record).get(); System.out.printf("topic=%s, partition=%d, offset=%s \n", metadata.topic(), metadata.partition(), metadata.offset()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
此时获得的输出以下:偏移量和调用次数有关,全部记录都分配到了 0 分区,这是由于在建立 Hello-Kafka
主题时候,使用 --partitions
指定其分区数为 1,即只有一个分区。
topic=Hello-Kafka, partition=0, offset=40 topic=Hello-Kafka, partition=0, offset=41 topic=Hello-Kafka, partition=0, offset=42 topic=Hello-Kafka, partition=0, offset=43 topic=Hello-Kafka, partition=0, offset=44 topic=Hello-Kafka, partition=0, offset=45 topic=Hello-Kafka, partition=0, offset=46 topic=Hello-Kafka, partition=0, offset=47 topic=Hello-Kafka, partition=0, offset=48 topic=Hello-Kafka, partition=0, offset=49
一般咱们并不关心发送成功的状况,更多关注的是失败的状况,所以 Kafka 提供了异步发送和回调函数。 代码以下:
for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i); /*异步发送消息,并监听回调*/ producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.out.println("进行异常处理"); } else { System.out.printf("topic=%s, partition=%d, offset=%s \n", metadata.topic(), metadata.partition(), metadata.offset()); } } }); }
Kafka 有着默认的分区机制:
某些状况下,你可能有着本身的分区需求,这时候能够采用自定义分区器实现。这里给出一个自定义分区器的示例:
/** * 自定义分区器 */ public class CustomPartitioner implements Partitioner { private int passLine; @Override public void configure(Map<String, ?> configs) { /*从生产者配置中获取分数线*/ passLine = (Integer) configs.get("pass.line"); } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { /*key 值为分数,当分数大于分数线时候,分配到 1 分区,不然分配到 0 分区*/ return (Integer) key >= passLine ? 1 : 0; } @Override public void close() { System.out.println("分区器关闭"); } }
须要在建立生产者时指定分区器,和分区器所须要的配置参数:
public class ProducerWithPartitioner { public static void main(String[] args) { String topicName = "Kafka-Partitioner-Test"; Properties props = new Properties(); props.put("bootstrap.servers", "hadoop001:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /*传递自定义分区器*/ props.put("partitioner.class", "com.heibaiying.producers.partitioners.CustomPartitioner"); /*传递分区器所需的参数*/ props.put("pass.line", 6); Producer<Integer, String> producer = new KafkaProducer<>(props); for (int i = 0; i <= 10; i++) { String score = "score:" + i; ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score); /*异步发送消息*/ producer.send(record, (metadata, exception) -> System.out.printf("%s, partition=%d, \n", score, metadata.partition())); } producer.close(); } }
须要建立一个至少有两个分区的主题:
bin/kafka-topics.sh --create \ --bootstrap-server hadoop001:9092 \ --replication-factor 1 --partitions 2 \ --topic Kafka-Partitioner-Test
此时输入以下,能够看到分数大于等于 6 分的都被分到 1 分区,而小于 6 分的都被分到了 0 分区。
score:6, partition=1, score:7, partition=1, score:8, partition=1, score:9, partition=1, score:10, partition=1, score:0, partition=0, score:1, partition=0, score:2, partition=0, score:3, partition=0, score:4, partition=0, score:5, partition=0, 分区器关闭
上面生产者的建立都仅指定了服务地址,键序列化器、值序列化器,实际上 Kafka 的生产者还有不少可配置属性,以下:
acks 参数指定了必需要有多少个分区副本收到消息,生产者才会认为消息写入是成功的:
设置生产者内存缓冲区的大小。
默认状况下,发送的消息不会被压缩。若是想要进行压缩,能够配置此参数,可选值有 snappy,gzip,lz4。
发生错误后,消息重发的次数。若是达到设定值,生产者就会放弃重试并返回错误。
当有多个消息须要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可使用的内存大小,按照字节数计算。
该参数制定了生产者在发送批次以前等待更多消息加入批次的时间。
客户端 id,服务器用来识别消息的来源。
指定了生产者在收到服务器响应以前能够发送多少个消息。它的值越高,就会占用越多的内存,不过也会提高吞吐量,把它设置为 1 能够保证消息是按照发送的顺序写入服务器,即便发生了重试。
指定了在调用 send()
方法或使用 partitionsFor()
方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。
该参数用于控制生产者发送的请求大小。它能够指发送的单个消息的最大值,也能够指单个请求里全部消息总的大小。例如,假设这个值为 1000K ,那么能够发送的单个最大消息为 1000K ,或者生产者能够在单个请求里发送一个批次,该批次包含了 1000 个消息,每一个消息大小为 1K。
这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 表明使用操做系统的默认值。
更多大数据系列文章能够参见 GitHub 开源项目: 大数据入门指南