前面讲的案例都是点对点的消息,即一个生产者发送的一条消息只能被一个消费者消费,然后就移除了。
而topic模式一条消息可以被多个消费者订阅,关系如下:
package com.dpb.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; /** * ActiveMQ中的生产者(Producer) * @author dengp * */ public class MyProducer { public void sendhello2ActiveMq(String messageText) { TopicSession session = null; TopicConnection conn = null; try { TopicConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616"); conn = factory.createTopicConnection(); conn.start(); session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // 创建消息队列 Topic topic = session.createTopic("test-topic"); // 创建消息发送者 TopicPublisher publisher = session.createPublisher(topic); // 设置持久化模式 NON_PERSISTENT不开启 PERSISTENT 开启 默认是开启 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("name", "波波烤鸭"); mapMessage.setString("address", "深圳"); publisher.send(mapMessage); // 提交会话 session.commit(); } catch (Exception e) { e.printStackTrace(); System.out.println("访问ActiveMQ服务发生错误!!"); } finally { try { // 回收会话资源 if (null != session) session.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收链接资源 if (null != conn) conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
package com.dpb.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; /** * ActiveMQ中的消费者(Consumer) * @author dengp * */ public class MyConsumer { public void reciveHelloFormActiveMq() { TopicSession session = null; TopicConnection conn = null; try { TopicConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616"); conn = factory.createTopicConnection(); conn.start(); session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // 创建消息队列 Topic topic = session.createTopic("test-topic"); // 创建消息接受者 MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message msg) { if (msg != null) { MapMessage map = (MapMessage) msg; try { System.out.println(map.getString("name") + "接收#" + map.getString("address")); } catch (JMSException e) { e.printStackTrace(); } } } }); // 休眠100s再关闭 Thread.sleep(1000 * 100); // 提交会话 session.commit(); } catch (Exception e) { e.printStackTrace(); System.out.println("访问ActiveMQ服务发生错误!!"); } finally { try { // 回收会话资源 if (null != session) session.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收链接资源 if (null != conn) conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
先启动消费者,可以开启多个
public static void main(String[] args) { MyConsumer con = new MyConsumer(); con.reciveHelloFormActiveMq(); }
启动生产者
public static void main(String[] args) { MyProducer pro = new MyProducer(); pro.sendhello2ActiveMq("你好啊...topic"); }
好了本文介绍到此,下篇介绍ActiveMQ和Spring的整合