RabbitMQ入门:发布/订阅(Publish/Subscribe)

在前面的两篇博客中html

遇到的实例都是一个消息只发送给一个消费者(工做者),他们的消息模型分别为(P表明生产者,C表明消费者,红色表明队列):ide

此次咱们来看下将一个消息发送给多个消费者(工做者),这种模式通常被称为“发布/订阅”模式。其工做模型为(P表明生产者,X表明Exchange(路由器/交换机),C表明消费者,红色表明队列):ui

咱们发现,工做模型中首次出现路由器,而且每一个消费者有单独的队列。生产者生成消息后将其发送给路由器,而后路由器转送到队列,消费者各自到本身的队列里面获取消息进行消费。在实际的应用场景中,生产者通常不会直接将消息发送给队列,而是发送给路由器进行中转,Exchange必须清楚的知道怎么处理收到的消息:是将消息发送到一个特定队列仍是多有队列,或者直接废弃消息。这种才符合RabbitMQ消息模型的核心思想this

接下来咱们详细展开今天的话题:spa

1、Exchange3d

Exchange在咱们的工做模型中首次出现,所以须要详细介绍下。日志

Exchange分为4种类型:code

Direct:彻底根据key进行投递的,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
Topic:对key进行模式匹配后进行投递,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。
Fanout:不须要key,它采起广播模式,一个消息进来时,投递到与该交换机绑定的全部队列。
Headers:咱们能够不考虑它。

今天咱们的实例采用fanout类型的exchange。htm

尽管首次出现,可是其实咱们前面的案例中也有用到exchange,只是咱们没有给他名字,用的是RabbitMQ默认的,好比下面这段代码,咱们将路由器名这个参数传入了“”,若是咱们须要本身声明exchange的话,这个就不能传入“”了,而是传入本身定义好的值。blog

2、临时队列

前面两篇博客中,咱们都在使用队列的时候给出了定义好的名字,这在生产者和消费者共用相同队列的时候颇有必要,可是咱们有了exchange,生产者不须要知道有哪些队列,所以队列名字能够不用指定了,而是经过RabbitMQ 接口本身去生成临时队列,队列名字也由RabbitMQ自动生成。经过

能够声明一个非持久的、通道独占的、自动删除的队列,getQueue()方法能够获取随机队列名字。这个名字用来在队列和exchange之间创建binding关系的时候使用:

 

3、代码实现

基于上面exchange和临时队列的知识铺垫,能够展开今天的代码实现了。

  1.  生产者
    public class Product {
        //exchange名字
        public static String EXCHANGE_NAME = "exchange";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.建立链接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.为通道声明exchange和exchange的类型
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
                
                String msg = " hello rabbitmq, this is publish/subscribe mode";
                // 3.发送消息到指定的exchange,队列指定为空,由exchange根据状况判断须要发送到哪些队列
                channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
                System.out.println("product send a msg: " + msg);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 4.关闭链接
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

     

  2. 消费者1
    public class Consumer1 {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.建立链接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.为通道声明exchange以及exchange类型
                channel.exchangeDeclare(Product.EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    
                // 3.建立随机名字的队列
                String queueName = channel.queueDeclare().getQueue();
    
                // 4.创建exchange和队列的绑定关系
                channel.queueBind(queueName, Product.EXCHANGE_NAME, "");
                System.out.println(" **** Consumer1 keep alive ,waiting for messages, and then deal them");
                // 5.经过回调生成消费者并进行监听
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                        // 获取消息内容而后处理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** Consumer1" + " get message :[" + msg + "]");
                    }
                };
                // 6.消费消息
                channel.basicConsume(queueName, true, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

     

  3. 消费者2,核心代码同消费者1同样,只是在日志打印上将"Consumer1"改成"Consumer2"而已。这里再也不列出具体代码。
  4. 先运行消费者1和2,而后运行生产者,观察控制台log打印状况:
    生产者:
    product send a msg:  hello rabbitmq, this is publish/subscribe mode
    
    消费者1**** Consumer1 keep alive ,waiting for messages, and then deal them
    *********** Consumer1 get message :[ hello rabbitmq, this is publish/subscribe mode]
    
    消费者2: **** Consumer2 keep alive ,waiting for messages, and then deal them
    *********** Consumer2 get message :[ hello rabbitmq, this is publish/subscribe mode]

    能够看到,当生产者发出消息后,两个消费者最终都收到了消息。

  5. 咱们去查看RabbitMQ管理页面:

    在Exchanges 标签页里面多了一个名为“exchange”的路由器,他的类型是fanout。点exchange 的link进入详细页面:

    发如今binding项目中有了两条绑定关系,队列的名字也能够看到。将页面切换到Queues标签页:

    出现了两个新的队列,队列名字和绑定关系中的同样,而且队列都是自动删除的、通道独占的。

  6. 而后将消费者1和消费者2都停掉,从新查看管理页面,咱们发现exchange还在,binding关系不存在了,临时队列也自动删除了

相关文章
相关标签/搜索