Apache Kafka 编程实战

Apache Kafka 编程实战您可能感性的文章:java

Apache-Kafka简介sql

Apache Kafka安装和使用apache

Apache-Kafka核心概念编程

Apache-Kafka核心组件和流程-协调器c#

Apache-Kafka核心组件和流程(副本管理器)缓存

Apache-Kafka 核心组件和流程-控制器安全

Apache-Kafka核心组件和流程-日志管理器bash

....服务器

本章经过实际例子,讲解了如何使用java进行kafka开发。网络

添加依赖:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
复制代码

下面是建立主题的代码:

public class TopicProcessor {
private static final String ZK_CONNECT="localhost:2181";
private static final int SESSION_TIME_OUT=30000;
private static final int CONNECT_OUT=30000;

public static void createTopic(String topicName,int partitionNumber,int replicaNumber,Properties properties){
ZkUtils zkUtils = null;
try{
zkUtils=ZkUtils.apply(ZK_CONNECT,SESSION_TIME_OUT,CONNECT_OUT, JaasUtils.isZkSecurityEnabled());
if(!AdminUtils.topicExists(zkUtils,topicName)){
AdminUtils.createTopic(zkUtils,topicName,partitionNumber,replicaNumber,properties,AdminUtils.createTopic$default$6());
}
}catch (Exception e){
e.printStackTrace();
}finally {
zkUtils.close();
}
}

public static void main(String[] args){
createTopic("javatopic",1,1,new Properties());
}
}
复制代码

首先定义了zookeeper相关链接信息。而后在createTopic中,先初始化ZkUtils,和zookeeper交互依赖于它。而后经过AdminUtils先判断是否存在你要建立的主题,若是不存在,则经过createTopic方法进行建立。传入参数包括主题名称,分区数量,副本数量等。

生产者生产消息

生产者生产消息代码以下:

public class MessageProducer {
private static final String TOPIC="education-info";
private static final String BROKER_LIST="localhost:9092";
private static KafkaProducer<String,String> producer = null;

static{
Properties configs = initConfig();
producer = new KafkaProducer<String, String>(configs);
}

private static Properties initConfig(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
return properties;
}

public static void main(String[] args){
try{
String message = "hello world";
ProducerRecord<String,String> record = new ProducerRecord<String,String>(TOPIC,message);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(null==exception){
System.out.println("perfect!");
}
if(null!=metadata){
System.out.print("offset:"+metadata.offset()+";partition:"+metadata.partition());
}
}
}).get();
}catch (Exception e){
e.printStackTrace();
}finally {
producer.close();
}
}
}
复制代码

一、首先初始化KafkaProducer对象。

producer = new KafkaProducer<String, String>(configs);
复制代码

二、建立要发送的消息对象。

ProducerRecord<String,String> record = new ProducerRecord<String,String>(TOPIC,message);
复制代码

三、经过producer的send方法,发送消息

四、发送消息时,能够经过回调函数,取得消息发送的结果。异常发生时,对异常进行处理。

初始化producer时候,须要注意下面属性设置:

properties.put(ProducerConfig.ACKS_CONFIG,"all");
复制代码

这里有三种值可供选择:

  • 0,不等服务器响应,直接返回发送成功。速度最快,可是丢了消息是没法知道的
  • 1,leader副本收到消息后返回成功
  • all,全部参与的副本都复制完成后返回成功。这样最安全,可是延迟最高。

消费者消费消息

咱们直接看代码

public class MessageConsumer {

private static final String TOPIC="education-info";
private static final String BROKER_LIST="localhost:9092";
private static KafkaConsumer<String,String> kafkaConsumer = null;

static {
Properties properties = initConfig();
kafkaConsumer = new KafkaConsumer<String, String>(properties);
kafkaConsumer.subscribe(Arrays.asList(TOPIC));
}

private static Properties initConfig(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"test");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
return properties;
}

public static void main(String[] args){
try{
while(true){
ConsumerRecords<String,String> records = kafkaConsumer.poll(100);
for(ConsumerRecord record:records){
try{
System.out.println(record.value());
}catch(Exception e){
e.printStackTrace();
}
}
}

}catch(Exception e){
e.printStackTrace();
}finally {
kafkaConsumer.close();
}
}
}
复制代码

代码逻辑以下:

一、初始化消费者KafkaConsumer,并订阅主题。

kafkaConsumer = new KafkaConsumer<String, String>(properties);
kafkaConsumer.subscribe(Arrays.asList(TOPIC));
复制代码

二、循环拉取消息

ConsumerRecords<String,String> records = kafkaConsumer.poll(100);
复制代码

poll方法传入的参数100,是等待broker返回数据的时间,若是超过100ms没有响应,则再也不等待。

三、拉取回消息后,循环处理。

for(ConsumerRecord record:records){
try{
System.out.println(record.value());
}catch(Exception e){
e.printStackTrace();
}
}
复制代码

消费相关代码比较简单,不过这个版本没有处理偏移量提交。学习过第四章-协调器相关的同窗应该还记得偏移量提交的问题。我曾说过最佳实践是同步和异步提交相结合,同时在特定的时间点,好比再均衡前进行手动提交。


加入偏移量提交,须要作以下修改:

一、enable.auto.commit设置为false

二、消费代码以下:

public static void main(String[] args){
try{
while(true){
ConsumerRecords<String,String> records =
kafkaConsumer.poll(100);
for(ConsumerRecord record:records){
try{
System.out.println(record.value());
}catch(Exception e){
e.printStackTrace();
}
}
kafkaConsumer.commitAsync();
}

}catch(Exception e){
e.printStackTrace();
}finally {
try{
kafkaConsumer.commitSync();
}finally {
kafkaConsumer.close();
}
}
}
复制代码

三、订阅消息时,实现再均衡的回调方法,在此方法中手动提交偏移量

kafkaConsumer.subscribe(Arrays.asList(TOPIC), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//再均衡以前和消费者中止读取消息以后调用
kafkaConsumer.commitSync(currentOffsets);
}
});
复制代码


经过以上三步,咱们把自动提交偏移量改成了手动提交。正常消费时,异步提交kafkaConsumer.commitAsync()。即便偶尔失败,也会被后续成功的提交覆盖掉。而在发生异常的时候,手动提交 kafkaConsumer.commitSync()。此外在步骤3中,咱们经过实现再均衡时的回调方法,手动同步提交偏移量,确保了再均衡前偏移量提交成功。

以上面的最佳实践提交偏移量,既能保证消费时较高的效率,又可以尽可能避免重复消费。不过因为重复消费没法100%避免,消费逻辑须要本身处理重复消费的判断。

更多你可能感兴趣的文章:
1-Flink入门
2-本地环境搭建&构建第一个Flink应用
3-DataSet API
4-DataSteam API
5-集群部署
6-分布式缓存
7-重启策略
8-Flink中的窗口
9-Flink中的Time
Flink时间戳和水印
Broadcast广播变量
FlinkTable&SQL
Flink实战项目实时热销排行
Flink写入RedisSink
Flink消费Kafka写入Mysql
Flink组件和逻辑计划
Flink执行计划生成
JobManager中的基本组件(1)
JobManager中的基本组件(2)
JobManager中的基本组件(3)
TaskManager
算子
网络
水印WaterMark
CheckPoint
任务调度与负载均衡
异常处理
Alibaba Blink新特性
Java高级特性加强-集合
Java高级特性加强-多线程
Java高级特性加强-Synchronized
Java高级特性加强-volatile
Java高级特性加强-并发集合框架
Java高级特性加强-分布式
Java高级特性加强-Zookeeper
Java高级特性加强-JVM
Java高级特性加强-NIO
Java高级特性加强-Netty

你真的不关注一下嘛~

相关文章
相关标签/搜索