淘淘商城系列——ActiveMQ发送topic消息和接收topic消息

我相信你们经过前面的学习,已然知道了如何发送队列消息及消费队列消息。本文咱们将一块儿学习如何发送topic消息和接收topic消息。
咱们依然在TestActiceMQ单元测试类中添加一个测试方法,用来测试发送topic消息,以下图所示,其实这个方法与发送队列消息几乎同样,只是建立Destination对象的时候不同而已。
这里写图片描述
为了方便你们复制,现将testTopicProducer测试方法的代码贴出。java

@Test
public void testTopicProducer() throws Exception {
    // 建立一个链接工厂对象
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
    // 使用链接工厂对象来建立一个链接
    Connection connection = connectionFactory.createConnection();
    // 开启链接
    connection.start();
    // 使用链接对象建立一个Session对象
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 使用Session对象来建立一个Topic
    Topic topic = session.createTopic("test-topic");
    // 使用Session对象来建立一个Producer,指定其目的地是Topic
    MessageProducer producer = session.createProducer(topic);
    // 建立一个TextMessage对象
    TextMessage message = session.createTextMessage("使用topic来发送的消息");
    // 使用Producer对象来发送消息
    producer.send(message);
    // 关闭资源
    producer.close();
    session.close();
    connection.close();
}

运行上面的测试方法,运行成功后,咱们访问activemq的管理后台页面,点击”Topics”,能够看到有”test-topic”这一行,压入消息队列一条消息,但因为没有消费者,所以没有消费掉该消息。
这里写图片描述
咱们点击上图的”test-topic”,会看到以下图所示界面。咱们发现刚才咱们发送的消息并无被保存。
这里写图片描述
而咱们发送的queue消息在未被消费前会被保存。
这样的话,就会有个问题,那就是若是发送topic消息时没有消费者,那么这条消息便不复存在了,不会再被消费了。所以咱们要想消息不会被遗失掉,咱们要先打开消费者,而后再发送topic消息。
咱们来写消费topic消息的方法,以下图所示,该方法与咱们上文学习的消费队列消息的方法不一样的是建立Destination的时候不同,同时为了模拟多个消费者,在该方法中添加一条输出信息,标明该方法是第几个消费者。
这里写图片描述
为了方便你们复制,现将testTopicConsumer测试方法的代码贴出。web

@Test
public void testTopicConsumer() throws Exception {
    // 建立一个链接工厂对象
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
    // 使用工厂对象建立一个链接
    Connection connection = connectionFactory.createConnection();
    // 开启链接
    connection.start();
    // 使用链接对象建立一个Session对象
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 建立一个Destination对象,使用topic
    Topic topic = session.createTopic("test-topic");
    // 使用Session对象建立一个消费者
    MessageConsumer consumer = session.createConsumer(topic);
    System.out.println("topic消费者1。。。。");
    // 使用消费者对象接收消息
    consumer.setMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message) {
            // 打印消息
            TextMessage textMessage = (TextMessage) message;
            String text = "";
            try {
                text = textMessage.getText();
                System.out.println(text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    // 程序等待
    System.in.read();

    // 关闭资源
    consumer.close();
    session.close();
    connection.close();

}

咱们运行上面的方法,会看到控制台输出”topic消费者1。。。。”,为了模仿多个消费者,咱们修改输出信息为”topic消费者2。。。。”,而后再运行该方法,从而增长一个消费者,而后再修改输出信息为”topic消费者3。。。。”,再运行该方法,就会再增长一个消费者,从而如今有三个消费者,以下图所示。
这里写图片描述session

这里写图片描述

这里写图片描述
启动了三个消费者后,咱们再发送一次topic消息,发完以后,咱们看各个控制台的信息。以下图所示,能够看到都打印出了咱们发送的topic信息。
这里写图片描述tcp

这里写图片描述

这里写图片描述
咱们再看下activemq的管理后台页面,发现消费者如今有3个,压入队列的消息有两条(第一条发送时没有消费者),消费的消息有3条(这是由于有三个消费者,对于第二次发送的topic消息,这三个消费者各自消费了一次,所以显示的数量是3)。
这里写图片描述ide