我相信你们经过前面的学习,已然知道了如何发送队列消息及消费队列消息。本文咱们将一块儿学习如何发送topic消息和接收topic消息。
咱们依然在TestActiceMQ单元测试类中添加一个测试方法,用来测试发送topic消息,以下图所示,其实这个方法与发送队列消息几乎同样,只是建立Destination对象的时候不同而已。
为了方便你们复制,现将testTopicProducer测试方法的代码贴出。java
@Test
public void testTopicProducer() throws Exception {
// 建立一个链接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
// 使用链接工厂对象来建立一个链接
Connection connection = connectionFactory.createConnection();
// 开启链接
connection.start();
// 使用链接对象建立一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 使用Session对象来建立一个Topic
Topic topic = session.createTopic("test-topic");
// 使用Session对象来建立一个Producer,指定其目的地是Topic
MessageProducer producer = session.createProducer(topic);
// 建立一个TextMessage对象
TextMessage message = session.createTextMessage("使用topic来发送的消息");
// 使用Producer对象来发送消息
producer.send(message);
// 关闭资源
producer.close();
session.close();
connection.close();
}
运行上面的测试方法,运行成功后,咱们访问activemq的管理后台页面,点击”Topics”,能够看到有”test-topic”这一行,压入消息队列一条消息,但因为没有消费者,所以没有消费掉该消息。
咱们点击上图的”test-topic”,会看到以下图所示界面。咱们发现刚才咱们发送的消息并无被保存。
而咱们发送的queue消息在未被消费前会被保存。
这样的话,就会有个问题,那就是若是发送topic消息时没有消费者,那么这条消息便不复存在了,不会再被消费了。所以咱们要想消息不会被遗失掉,咱们要先打开消费者,而后再发送topic消息。
咱们来写消费topic消息的方法,以下图所示,该方法与咱们上文学习的消费队列消息的方法不一样的是建立Destination的时候不同,同时为了模拟多个消费者,在该方法中添加一条输出信息,标明该方法是第几个消费者。
为了方便你们复制,现将testTopicConsumer测试方法的代码贴出。web
@Test
public void testTopicConsumer() throws Exception {
// 建立一个链接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
// 使用工厂对象建立一个链接
Connection connection = connectionFactory.createConnection();
// 开启链接
connection.start();
// 使用链接对象建立一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立一个Destination对象,使用topic
Topic topic = session.createTopic("test-topic");
// 使用Session对象建立一个消费者
MessageConsumer consumer = session.createConsumer(topic);
System.out.println("topic消费者1。。。。");
// 使用消费者对象接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// 打印消息
TextMessage textMessage = (TextMessage) message;
String text = "";
try {
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 程序等待
System.in.read();
// 关闭资源
consumer.close();
session.close();
connection.close();
}
咱们运行上面的方法,会看到控制台输出”topic消费者1。。。。”,为了模仿多个消费者,咱们修改输出信息为”topic消费者2。。。。”,而后再运行该方法,从而增长一个消费者,而后再修改输出信息为”topic消费者3。。。。”,再运行该方法,就会再增长一个消费者,从而如今有三个消费者,以下图所示。
session
启动了三个消费者后,咱们再发送一次topic消息,发完以后,咱们看各个控制台的信息。以下图所示,能够看到都打印出了咱们发送的topic信息。
tcp
咱们再看下activemq的管理后台页面,发现消费者如今有3个,压入队列的消息有两条(第一条发送时没有消费者),消费的消息有3条(这是由于有三个消费者,对于第二次发送的topic消息,这三个消费者各自消费了一次,所以显示的数量是3)。
ide