“简单”的消息队列与kafka

小时候就特别喜欢龙系精灵,特别是乘龙,后来才知道只是冰水。。。尴尬。 在宠物小精灵中,乘龙一直是训练家的载人伙伴,和咱们下面的MQ好像有几分类似呢~~

前言

MQ,全称消息队列,如今市面上有不少种消息队列,像你们耳熟能详的RabbitMQ,RocketMQ,Kafka等等,接下来为你们详细的介绍消息队列。数据库

使用场景

俗话说的好,多用多错,不能为了技术而技术,要结合实际的业务场景使用合适的技术。
例如你用了Redis缓存,这时候你也许得考虑主从架构。由于主从架构,你可能得考虑主从切换。同时也许你还得考虑集群模式。这就大大的提升了开发与维护的成本。所以选择一种合适的技术是十分重要的。缓存

异步解耦

  1. 若是是单体应用,在用户下单时,下订单,减库存,物流记录这三个操做是同步阻塞的。若是将这三步的操做存放到各自的消息队列中,而后监听这三个消息队列,那么能够大大的减小时间。
  2. 但同时,因为是分布式应用,你可能得考虑分布式事务的必要性了。此外,为了保证MQ的高可用,你可能得去调研市场上集群模式支持的最好的中间件了。

流量削峰

  1. 在一些秒杀场景,为了防止极高的并发,将数据库冲垮,除了能够在负载均衡端进行限流的设置,同时也能够将用户的请求数据存放到消息队列中,而后经过消息队列中控制处理速度。
  2. 同时这个处理的进程能够监听zk的某个节点,在秒杀结束后,修改这个节点的值,进程监听到这个节点值的变化,将再也不处理请求(能够作成拦截器)。这个有时也被用来做为大数据中流处理的缓冲

市面上的消息队列

RabbitMQ

介绍

RabbitMQ 试用于逻辑相对复杂,同时对于队列的性能需求相对不高的场景。bash

模型

  1. RabbitMQ发送消息时会先发送到Exchange(交换机),指定routing key
  2. Exchange和队列绑定时指定 binding key
  3. 只有当routing keybinding key 的匹配规则成立时,才会将消息发往指定的队列。
  4. 两个消费者监听同一个队列,一条消息不会被两个队列同时消费。这是和kafka有区别的。

kafka

kafka topic

RabbitMQ中 数据存储的最小单位是queue,而在kafka中 的最小单位是partition,partition包含在topic中,见下图。网络

直连模型

一个消费组 gourp1 监听一个 topic, c1和c2会分别消费一个分区。 若是是一个消费组的话,必须指定默认消费者组名称

订阅模型

多个消费组(每一个组中只有一个消费者), 监听一个topic,每一个消费者会消费topic中的全部分区。

kafka 偏移量(offset)

每条消息在分区中都会有一个偏移量。在消息进入分区前,会给新的消息分配一个惟一的偏移量,保证了每一个分区中消息的顺序性。
RabbitMQ中, 多个消费者监听一个队列,因为消息是异步发送的,因为网络等缘由,可能Consumer2先接受到Message2,这样会致使消息的处理顺序不会按照消息存入队列的顺序。固然,咱们能够经过一个消费者监听一个队列来保证消息的有序性。
而kafka呢,多个消费者在一个消费者组中,监听一个topictopic中的不一样的分区会被不一样的消费者消费(一个消费者消费一个分区)。固然,咱们只能保证每一个分区的消息时被顺序消费的,不一样的分区则不能保证了。架构

kafka实战

一个分区,一个消费者组,一个消费者

/** 一个消费者 一个消费者组 一个分区 **/
    public void send1() {
        kafkaTemplate.send("test1",0,"test1","data1");
        System.out.println("生产者发送消息");
    }
复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("消费者接受消息" + record);
        }
    }
复制代码
生产者发送消息
消费者接受消息ConsumerRecord(topic = test1, partition = 0, offset = 0, CreateTime = 1560260495702, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
复制代码

2个分区,一个消费者组,一个消费者,消费全部分区

## 增长分区数
kafka-topics --zookeeper localhost:2181 --alter --topic test1 --partitions 2
## 结果
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
复制代码
## 生产者
kafkaTemplate.send("test1",0 #分区数,"test1","data1");
kafkaTemplate.send("test1",1,"test11","data11");
System.out.println("生产者发送消息");
复制代码
## 消费者
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("消费者1接受消息" + record);
        }
    }
复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 0, offset = 4, CreateTime = 1560261029521, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
消费者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 1, CreateTime = 1560261029521, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
复制代码

一个分区,2个消费者组,2个消费者 ,每一个消费者都消费一个分区

/** 2个消费者 2个消费者组 1个分区 **/
    public void send3() {
        kafkaTemplate.send("test1",0,"test1","data1");
        System.out.println("生产者发送消息");
    }
复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消费者1接受消息" + record);
        }
    }

    @KafkaListener(topics = {"test1"}, groupId = "test2")
    public void test2(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消费者2接受消息" + record);
        }
    }
复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 0, offset = 7, CreateTime = 1560261183386, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
消费者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 7, CreateTime = 1560261183386, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
复制代码

2个分区,2个消费者,1个消费者组(负载均衡)

/** 2个消费者 1个消费者组 2个分区 **/
    public void send4() {
        kafkaTemplate.send("test1",0,"test1","data1");
        kafkaTemplate.send("test1",1,"test11","data11");
        System.out.println("生产者发送消息");

    }
复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消费者1接受消息" + record);
        }
    }

    @KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test2(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消费者2接受消息" + record);
        }
    }
复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 3, CreateTime = 1560261444482, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
消费者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 9, CreateTime = 1560261444482, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
复制代码

2个分区,3个消费者,1个消费者组(有个消费者未消费分区)

/** 2个消费者 1个消费者组 2个分区 **/
    public void send4() {
        kafkaTemplate.send("test1",0,"test1","data1");
        kafkaTemplate.send("test1",1,"test11","data11");
        System.out.println("生产者发送消息");

    }
复制代码
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消费者1接受消息" + record);
        }
    }

    @KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test2(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消费者2接受消息" + record);
        }
    }
    
    @KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test3(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消费者3接受消息" + record);
        }
    }
复制代码
消费者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 3, CreateTime = 1560261444482, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
消费者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 9, CreateTime = 1560261444482, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
复制代码

kafka消息发送机制

xxxx

未完待续

相关文章
相关标签/搜索