其余更多java基础文章:
java基础学习(目录)java
学习资料:kafka数据可靠性深度解读apache
消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成,其组织结构以下图所示:bootstrap
咱们能够看到,每一个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每个消息都被赋予了一个惟一的offset值。缓存
同一个partition可能会有多个replication(对应 server.properties 配置中的 default.replication.factor=N)。没有replication的状况下,一旦broker 宕机,其上全部 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。引入replication以后,同一个partition可能会有多个replication,而这时须要在这些replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication做为follower从leader 中复制数据。bash
ACK,HW,ISR等能够阅读kafka数据可靠性深度解读学习
简单来讲:服务器
- HW是HighWatermark的缩写,是指consumer可以看到的此partition的位置
- ISR (In-Sync Replicas),这个是指副本同步队列。全部的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
- ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的可以被收到。
kafka提供了两套consumer API:高级Consumer API和低级API。架构
消费者是以consumer group消费者组的方式工做,由一个或者多个消费者组成一个组,共同消费一个topic。每一个分区在同一时间只能由group中的一个消费者读取,可是多个group能够同时消费这个partition。并发
import org.apache.kafka.clients.producer.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args){
//test();
test2();
}
public static void test(){
Properties props= new Properties();
props.put("bootstrap.servers", "172.26.40.181:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer(props);
for(int i = 0; i < 10; i++){
producer.send(new ProducerRecord("first",Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
/**
* 带回调函数
*/
public static void test2(){
Properties props = new Properties();
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "172.26.40.181:9092");
// 等待全部副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 增长服务端请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//拦截器
List<String> interceptor = new ArrayList<>();
interceptor.add("com.hiway.practice.kafka.TimeInterceptor");
interceptor.add("com.hiway.practice.kafka.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptor);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
for (int i = 0; i < 50; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
System.err.println(metadata.partition() + "---" + metadata.offset());
}
}
});
}
kafkaProducer.close();
}
}
复制代码
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomNewConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// 定义kakfa 服务的地址,不须要将全部broker指定上
props.put("bootstrap.servers", "172.26.40.181:9092");
// 制定consumer group
props.put("group.id", "test");
// 是否自动确认offset
props.put("enable.auto.commit", "true");
// 自动确认offset的时间间隔
props.put("auto.commit.interval.ms", "1000");
// key的序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value的序列化类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 定义consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费者订阅的topic, 可同时订阅多个
consumer.subscribe(Arrays.asList("first"));
while (true) {
// 读取数据,读取超时时间为100ms
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
复制代码
public class TimeInterceptor implements ProducerInterceptor<String,String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<>(record.topic(),record.partition(),record.timestamp(),record.key(),System.currentTimeMillis() + "," + record.value().toString());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 统计成功和失败的次数
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存结果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
复制代码
1. Uncaught error in kafka producer I/O thread错误
这个问题主要是服务器上的kafka版本和IDEA中的kafka版本不一致致使的。负载均衡
2.producer发送数据到集群上无反应分布式
将kafka/config/server.properties文件中advertised.listeners改成以下属性。172.26.40.181是我虚拟机的IP。改完后重启,OK了。Java端的代码终于能通讯了 advertised.listeners=PLAINTEXT://172.26.40.181:9092 advertised.listeners上的注释是这样的:
#Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
复制代码
意思就是说:hostname、port都会广播给producer、consumer。若是你没有配置了这个属性的话,则使用listeners的值,若是listeners的值也没有配置的话,则使用 java.net.InetAddress.getCanonicalHostName()返回值(这里也就是返回localhost了)。