消息队列之activeMQhtml
消息队列之RabbitMQjava
kafka是由scala语言开发的一个多分区,多副本的而且居于zookeeper协调的分布式的发布-订阅消息系统。具备高吞吐、可持久化、可水平扩展、支持流处理等特性;可以支撑海量数据的数据传递;而且将消息持久化到磁盘中,并对消息建立了备份保证了数据的安全。kafka在保证了较高的处理速度的同时,又能保证数据处理的低延迟和数据的零丢失。node
kafka的特性:shell
kafka的主要应用场景:数据库
基本流程:apache
kafka的关键角色:bootstrap
安装kafka的前提是安装zookeeper以及jdk环境。我这里安装的版本是jdk1.8.0_20,kafka_2.11-1.0.0,zookeeper-3.4.14。kafka与jdk的版本必定要对应。我以前用的kafka_2.12_2.3.0,就不行缓存
1.将kafka的文件上传到home目录下并解压缩到/usr/local目录下安全
root@localhost home]# tar -xvzf kafka_2.11-1.0.0.tgz -C /usr/local
2.进入kafka的config服务器
[root@localhost local]# cd /usr/local/kafka_2.11-1.0.0/config
3.编辑server.properties文件
# 若是是集群环境中,则每一个broker.id要设置为不一样 broker.id=0 # 将下面这一行打开,这至关于kafka对外提供服务的入口 listeners=PLAINTEXT://192.168.189.150:9092 # 日志存储位置:log.dirs=/tmp/kafka_logs 改成 log.dirs=/usr/local/kafka_2.11-1.0.0/logs # 修改zookeeper的地址 zookeeper.connect=192.168.189.150:2181 # 修改zookeeper的链接超时时长,默认为6000(可能会超时) zookeeper.connection.timeout.ms=10000
3.启动zookeeper
由于我是配置的zookeeper集群,因此须要将三台zookeeper都启动。只启动单台服务器zookeeper在选举的时候将不可进行(当整个集群超过半数机器宕机,zookeeper会认为集群处于不可用状态)
[root@localhost ~]# zkServer.sh start # 查看状态 [root@localhost ~]# zkServer.sh status
4.启动kafka
[root@localhost kafka_2.11-1.0.0]# bin/kafka-server-start.sh config/server.properties # 也可使用后台启动的方式,若是不使用后台启动,则在启动后操做须要新开一个窗口才能操做 [root@localhost kafka_2.11-1.0.0]# bin/kafka-server-start.sh -daemon config/server.properties
5.建立一个主题
# --zookeeper: 指定了kafka所链接的zookeeper的服务地址 # --partitions: 指定了分区的个数 # --replication-factor: 指定了副本因子 [root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic charon --partitions 2 --replication-factor 1 Created topic "charon".
6.展现全部的主题(验证建立的主题是否有问题)
[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list charon
7.查看某个主题的详情
[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic charon Topic:charon PartitionCount:2 ReplicationFactor:1 Configs: Topic: charon Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: charon Partition: 1 Leader: 0 Replicas: 0 Isr: 0
8.新开一个窗口启动消费者接收消息.
--bootstrap-server:指定链接kafka集群的地址,9092是kafka服务的端口。由于个人配置文件中配置的是具体地址,因此须要写明具体地址。不然会报 [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available.的错
[root@localhost kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.189.150:9092 --topic charon
9.新开一个窗口启动生产者产生消息
--bootstrap-server:指定链接kafka集群的地址,9092是kafka服务的端口。由于个人配置文件中配置的是地址。
[root@localhost kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --broker-list 192.168.189.150:9092 --topic charon
10.产生消息并消费消息
# 生产者生产消息 >hello charon good evening # 消费者这边接收到的消息 hello charon good evening
固然上面这种方式,只有在同一个网段才能实现。
kafka生产流程:
1)producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader
2)producer将消息发送给该leader
3)leader将消息写入本地log
4)followers从leader pull消息,写入本地log后向leader发送ACK
5)leader收到全部ISR中的replication的ACK后,增长HW(high watermark,最后commit 的offset)并向producer发送ACK
消费组:
kafka消费者是消费组的一部分,当多个消费者造成一个消费组来消费主题的时候,每一个消费者都会收到来自不一样分区的消息。假如消费者都在同一个消费者组里面,则是工做-队列模型。假如消费者在不一样的消费组里面,则是发布-订阅模型。
当单个消费者没法跟上数据的生成速度时,就能够增长更多的消费者来分担负载,每一个消费者只处理部分分区的消息,从而实现单个应用程序的横向伸缩。可是千万不要让消费者的数量少于分区的数量,由于此时会有多余的消费者空闲。
当有多个应用程序都须要从kafka获取消息时,让每一个应用程序对应一个消费者组,从而使每一个应用程序都能获取一个或多个topic的所有消息。每一个消费者对应一个线程,若是要在同一个消费者组中运行多个消费者,须要让每一个消费者运行在本身的线程中。
1.添加依赖:
<!--添加kafka的依赖--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version> </dependency>
生产者代码:
package kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; /** * @className: Producer * @description: kafka的生产者 * @author: charon * @create: 2021-01-18 08:52 */ public class Producer { /**topic*/ private static final String topic = "charon"; public static void main(String[] args) { // 配置kafka的属性 Properties properties = new Properties(); // 设置地址 properties.put("bootstrap.servers","192.168.189.150:9092"); // 设置应答类型,默认值为0。(0:生产者不会等待kafka的响应;1:kafka的leader会把这条消息写到本地日志文件中,但不会等待集群中其余机器的成功响应; // -1(all):leader会等待全部的follower同步完成,确保消息不会丢失,除非kafka集群中的全部机器挂掉,保证可用性) properties.put("acks","all"); // 设置重试次数,大于0,客户端会在消息发送失败是从新发送 properties.put("reties",0); // 设置批量大小,当多条消息须要发送到同一个分区时,生产者会尝试合并网络请求,提交效率 properties.put("batch.size",10000); // 生产者设置序列化方式,默认为:org.apache.kafka.common.serialization.StringSerializer properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 建立生产者 KafkaProducer producer = new KafkaProducer(properties); for (int i = 0; i < 5; i++) { String message = "hello,charon message "+ i ; producer.send(new ProducerRecord(topic,message)); System.out.println("生产者发送消息:" + message); } producer.close(); } }
消费者代码:
package kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Arrays; import java.util.List; import java.util.Properties; /** * @className: Consumer * @description: kafka的消费者 * @author: charon * @create: 2021-01-18 08:53 */ public class Consumer implements Runnable{ /**topic*/ private static final String topic = "charon"; /**kafka消费者*/ private static KafkaConsumer kafkaConsumer; /**消费消息*/ private static ConsumerRecords<String,String> msgList; public static void main(String[] args) { // 配置kafka的属性 Properties properties = new Properties(); // 设置地址 properties.put("bootstrap.servers","192.168.189.150:9092"); // 消费者设置反序列化方式 properties.put("key.deserializer", StringDeserializer.class.getName()); properties.put("value.deserializer", StringDeserializer.class.getName()); // 设置消费组 properties.put("group.id","test01"); // 设置容许自动提交 properties.put("enable.auto.commit","true"); // 设置自动提交的时间间隔 properties.put("auto.commit.interval.ms","1000"); // 设置链接的超时市场 properties.put("session.timeout.ms","30000"); // 建立消费者 kafkaConsumer = new KafkaConsumer(properties); // 指定分区 kafkaConsumer.subscribe(Arrays.asList(topic)); Consumer consumer = new Consumer(); new Thread(consumer).start(); // kafkaConsumer.close(); } @Override public void run() { for (;;){ // 获取数据的超时1000ms msgList = kafkaConsumer.poll(1000); if(null != msgList && msgList.count() > 0){ for (ConsumerRecord<String,String> consumerRecord: msgList ) { System.out.println("消费者接受到消息,开始消费:" + consumerRecord); System.out.println("topic= "+consumerRecord.topic()+" ,partition= "+consumerRecord.partition()+" ,offset= "+consumerRecord.offset()+" ,value="+consumerRecord.value()+"\n"); } }else{ // 若是没有接受到数据,则阻塞一段时间 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
kafka不会像activemq那样须要获得消费者确认,因此消费者须要追踪kafka的消息消费到分区中的哪一个位置了,这个位置就叫偏移量。把更新分区当前位置的操做叫作提交。若是消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡以后,每一个消费者可能分配到新的分区上,而不是以前处理的那个,为了可以继续以前的工做,消费者须要读取每一个分区最后一次提交的偏移量,而后从偏移量指定的地方继续处理。
这样的话就可能会有如下两种状况:
1.提交的偏移量小于客户端处理的偏移量
若是提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被从新处理。
2.提交的偏移量大于客户端处理的偏移量
若是提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会丢失。
kafka的提交方式:
自动提交模式:消费者拉取数据以后自动提交偏移量,不关心后续对消息的处理是否正确。优势是:消费快,适用于数据一致性弱的业务场景,缺点为:消息容易丢失或者重复消费。
将enable.auto.commit被设为 true
手动提交模式:消费者拉取数据以后作业务处理,并且须要业务处理完成才算真正消费成功。缺点:在broker对提交请求作出回应以前,应用程序会一直阻塞,会限制应用程序的吞吐量。
将enable.auto.commit被设为 false;
在消息处理完成后手动调用consumer.commitSync();
异步提交:只须要发送提交请求,无需等待broker的响应
在消息处理完成后手动调用consumer.commitAsync();这个方法也支持回调,在broker做出响应时会执行回调,回调常常被用于记录提交失败将错误信息和偏移量记录下来,若是从新提交,则须要注意提交的顺序。
在为消费者分配新的分区或者移除旧的分区时,能够经过消费者API执行一些应用程序代码,在调用subscribe(Pattern pattern, ConsumerRebalanceListener listener)时,能够传入一个再均衡监听器。
须要实现的两个方法:
public void onPartitionRevoked(Collection
在再均衡开始以前和消费者中止读取消息以后被调用,若是在这里提交偏移量,下一个接管分区的消费者就知道从哪里开始读取了,要注意提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量。
public void onPartitionAssigned(Collection
在从新分配分区以后和消费者开始夫区消息以前被调用
首先来看看kafka的应答类型:
若是是单机环境中,三者没有区别。
kafka的消息重复和丢失可能发生在三个阶段:
1.生产者阶段的缘由为:生产者发送的消息没有收到正确的broker的响应,致使生产者重试。
生产者发送一条消息,broker罗盘之后由于网络等种种缘由,发送端获得一个发送失败的响应或者网络中断,而后prodcuer收到一个可恢复的exception重试消息致使消息重试。
重试过程:
解决方式:
1.启动kafka的幂等性。要启动kafka的幂等性,须要修改配置文件中的:enable.idempotenmce=true,同时要求ack=all且retries>1。若是要提升数据的可靠性,还须要min.insync.replicas这个参数配合,若是ISR的副本数少于min.insync.replicas则会产生异常,缘由:消息被拒绝,同步副本数量少于所需的数量
幂等性的原理:
每一个生产者都有一个PID,服务端回经过PID关联记录每一个生产者的状态,每一个生产者的每一个消息会带上一个递增的序列(sequence),服务端会记录每一个生产者对应的当前的最大的序列(PID+seq),若是新的消息带上的序列不大于当前的最大的seq就拒绝这条消息,若是消息落盘会同时更新最大的seq,这个时候重发的消息会呗服务器拒掉从而避免了消息重复。
2.设置ack=0,即不须要确认,不重试。但可能会丢失数据,因此适用于吞吐量指标重要性高于数据丢失,例如:日志收集。
2.生产者和broker阶段的缘由:
ack=0,不重试。生产者发送消息后,无论结果如何,若是发送失败数据也就丢失了。
ack=1,leader宕机(crash)了,生产者发送消息完,只等待leader写入成功就返回了,leader宕机了,这是follower还没来得及同步,那么消息就丢失了。
unclean.leader.election.enable 配置为true。容许选举ISR之外的副本做为leader,会致使数据丢失,默认为fase(非ISR中的副本不能参与选举)。
生产者发送完异步消息,只等待leader写入成功就返回了,leader宕机了,这时ISR中没有follower,leader从OSR中选举,由于OSR中原本落后于leader而形成数据丢失。
解决方式:
1.配置:ack=-1,retries>1,unclean.leader.election.enable=false
生产者发送完消息,等待follower同步完在返回,若是异常则重试,这时副本的数量可能影响吞吐量,最大不超过5个,通常三个就够了。
2.配置:min.insync.replicas > 1
当生产者将ack设置为all或-1时,min.insync副本指定必须确认写操做成功的最小副本数量,若是不能知足这个最小值,则生产者将引起一个异常。当一块儿使用时,min.insync.replicas和ack容许执行更大的持久性保证。
3.失败的offset单独记录
生产者发送消息,回自动重试,遇到不可恢复的异常会抛出,这时能够捕获异常记录到数据库或缓存中,进行单独处理。
3.消费阶段的缘由:数据消费完没有及时提交offset到broker。消息消费端在消费过程当中挂掉没有及时提交offset到broker,另外一个消费者启动拿到以前记录的offset开始消费,因为offset的滞后性可能会致使新启动的客户端有少许重复消费。
解决方式:
1.取消自动提交,每次消费完或者程序退出时手动提交,这也没有办法保证不会有重复。
2.作幂等性,尽可能让下游作幂等或者尽可能每消费一条消息都记录offset。对于少书严格的场景可能须要吧offset或惟一ID和下游状态更新放在同一个数据库里作事务来保证精确的一次更新或者在下游数据库表里同时记录消费的offset。而后更新数据的时候用消费位点作乐观锁拒绝掉旧的位点的数据更新。
参考文章:
https://www.cnblogs.com/qingyunzong/p/9004509.html
https://www.cnblogs.com/frankdeng/p/9310704.html