MQ,全称消息队列,如今市面上有不少种消息队列,像你们耳熟能详的RabbitMQ,RocketMQ,Kafka等等,接下来为你们详细的介绍消息队列。数据库
俗话说的好,多用多错,不能为了技术而技术,要结合实际的业务场景使用合适的技术。
例如你用了Redis缓存,这时候你也许得考虑主从架构。由于主从架构,你可能得考虑主从切换。同时也许你还得考虑集群模式。这就大大的提升了开发与维护的成本。所以选择一种合适的技术是十分重要的。缓存
RabbitMQ 试用于逻辑相对复杂,同时对于队列的性能需求相对不高的场景。bash
RabbitMQ
发送消息时会先发送到Exchange
(交换机),指定routing key
。Exchange
和队列绑定时指定 binding key
routing key
和 binding key
的匹配规则成立时,才会将消息发往指定的队列。RabbitMQ中 数据存储的最小单位是queue
,而在kafka中 的最小单位是partition
,partition
包含在topic
中,见下图。网络
gourp1
监听一个
topic
, c1和c2会分别消费一个分区。 若是是一个消费组的话,必须指定默认消费者组名称
每条消息在分区中都会有一个偏移量。在消息进入分区前,会给新的消息分配一个惟一的偏移量,保证了每一个分区中消息的顺序性。
在RabbitMQ
中, 多个消费者监听一个队列,因为消息是异步发送的,因为网络等缘由,可能Consumer2
先接受到Message2
,这样会致使消息的处理顺序不会按照消息存入队列的顺序。固然,咱们能够经过一个消费者监听一个队列来保证消息的有序性。
而kafka呢,多个消费者在一个消费者组中,监听一个topic
。topic
中的不一样的分区会被不一样的消费者消费(一个消费者消费一个分区)。固然,咱们只能保证每一个分区的消息时被顺序消费的,不一样的分区则不能保证了。架构
/** 一个消费者 一个消费者组 一个分区 **/
public void send1() {
kafkaTemplate.send("test1",0,"test1","data1");
System.out.println("生产者发送消息");
}
复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test1(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消费者接受消息" + record);
}
}
复制代码
生产者发送消息
消费者接受消息ConsumerRecord(topic = test1, partition = 0, offset = 0, CreateTime = 1560260495702, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
复制代码
## 增长分区数
kafka-topics --zookeeper localhost:2181 --alter --topic test1 --partitions 2
## 结果
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
复制代码
## 生产者
kafkaTemplate.send("test1",0 #分区数,"test1","data1");
kafkaTemplate.send("test1",1,"test11","data11");
System.out.println("生产者发送消息");
复制代码
## 消费者
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test1(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消费者1接受消息" + record);
}
}
复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 0, offset = 4, CreateTime = 1560261029521, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
消费者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 1, CreateTime = 1560261029521, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
复制代码
/** 2个消费者 2个消费者组 1个分区 **/
public void send3() {
kafkaTemplate.send("test1",0,"test1","data1");
System.out.println("生产者发送消息");
}
复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test1(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消费者1接受消息" + record);
}
}
@KafkaListener(topics = {"test1"}, groupId = "test2")
public void test2(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消费者2接受消息" + record);
}
}
复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 0, offset = 7, CreateTime = 1560261183386, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
消费者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 7, CreateTime = 1560261183386, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
复制代码
/** 2个消费者 1个消费者组 2个分区 **/
public void send4() {
kafkaTemplate.send("test1",0,"test1","data1");
kafkaTemplate.send("test1",1,"test11","data11");
System.out.println("生产者发送消息");
}
复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test1(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消费者1接受消息" + record);
}
}
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test2(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消费者2接受消息" + record);
}
}
复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 3, CreateTime = 1560261444482, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
消费者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 9, CreateTime = 1560261444482, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
复制代码
/** 2个消费者 1个消费者组 2个分区 **/
public void send4() {
kafkaTemplate.send("test1",0,"test1","data1");
kafkaTemplate.send("test1",1,"test11","data11");
System.out.println("生产者发送消息");
}
复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test1(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消费者1接受消息" + record);
}
}
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test2(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消费者2接受消息" + record);
}
}
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test3(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消费者3接受消息" + record);
}
}
复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 3, CreateTime = 1560261444482, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
消费者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 9, CreateTime = 1560261444482, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
复制代码