今天继续给你们分享的是ActiveMQ,若有不足,敬请指教。java
上次咱们说到,咱们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,咱们须要屡次运行消费者,才能消费完这些消息。很麻烦!!!!如何解决这个问题呢?web
这就须要使用ActiveMQ监听器来监听队列,持续消费消息。redis
package com.xkt.listener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * @author lzx * */ public class MyListener implements MessageListener { @Override public void onMessage(Message message) { if (null != message) { if (message instanceof TextMessage) { try { TextMessage tMsg = (TextMessage) message; String content = tMsg.getText(); System.out.println("监听到的消息是 " + content); } catch (JMSException e) { e.printStackTrace(); } } } } }
package com.xkt.consumer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import com.xkt.listener.MyListener; /** * @author lzx * */ public class Myconsumer { private ConnectionFactory factory; private Connection connection; private Session session; private Destination destination; private MessageConsumer consumer; public void receiveFromMq() { try { factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立目的地, 目的地命名即队列命名, 消息消费者须要经过此命名访问对应的队列 destination = session.createQueue("queue"); // 5.建立消息消费者, 建立的消息消费者与某目的地对应, 即方法参数目的地 consumer = session.createConsumer(destination); // 7.加载监听器 consumer.setMessageListener(new MyListener()); // 监听器须要持续加载,这里咱们使用输入流阻塞当前线程结束。监听指定队列,只要有消息进来,就消费这条消息 System.in.read(); // 在java项目中,能够经过IO阻塞程序,持续加载监听器 // 在web项目中,能够经过配置文件,直接加载监听器。 } catch (Exception e) { e.printStackTrace(); System.out.println("读取失败"); } finally { if (null != consumer) { try { consumer.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != session) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != connection) { try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
图示 |
---|
![]() |
图示 |
---|
![]() |
图示 |
---|
![]() |
在以上示例中,只能向一个消费者发送消息。可是有一些场景,需求有多个消费者都能接收到消息,好比:美团APP天天的消息推送。该如何实现呢?apache
package com.xkt.subscriber; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author lzx * */ public class MySubscriber implements Runnable { /** * 多线程的线程安全问题 解决方案: * * (1)加锁 --极不推荐 (2)不使用全局变量 ---> SpringMVC是线程安全的吗? 答:默认不是 解决办法:(1)使用原型模式--不推荐 * (2)不使用全局变量 (3)ThreadLocal (3)其它框架来代替,好比redis */ private TopicConnectionFactory factory; private TopicConnection connection; private TopicSession session; private Topic topic; private TopicSubscriber subscriber; private Message message; @Override public void run() { try { // 一、建立链接工厂 factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); // 二、建立链接 connection = factory.createTopicConnection(); connection.start(); // 三、建立会话 session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // 四、建立topic主题 topic = session.createTopic("topic-gzsxt"); // 五、建立订阅者 subscriber = session.createSubscriber(topic); // 六、订阅 while (true) { message = subscriber.receive(); if (null != message) { if (message instanceof TextMessage) { TextMessage tMsg = (TextMessage) message; String content = tMsg.getText(); System.out.println("订阅者: " + Thread.currentThread().getName() + " 接收的消息是:" + content); } } } } catch (JMSException e) { e.printStackTrace(); } } }
package com.xkt.test; import com.xkt.subscriber.MySubscriber; /** * @author lzx * */ public class TestMQ { public static void main(String[] args) { MySubscriber sub = new MySubscriber(); Thread t1 = new Thread(sub); Thread t2 = new Thread(sub); t1.start(); t2.start(); } }
package com.xkt.publish; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author lzx * */ public class MyPublisher { private TopicConnectionFactory factory; private TopicConnection connection; private TopicSession session; private Topic topic; private TopicPublisher publisher; private Message message; public void publish(String msg) { try { // 一、建立链接工厂 factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); // 二、建立链接 connection = factory.createTopicConnection(); connection.start(); // 三、建立会话 session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // 四、建立topic主题 topic = session.createTopic("topic-gzsxt"); // 五、建立发布者 publisher = session.createPublisher(topic); // 六、建立消息对象 message = session.createTextMessage(msg); // 七、发布消息 publisher.publish(message); } catch (Exception e) { e.printStackTrace(); } finally { if (null != publisher) { try { publisher.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != session) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != connection) { try { connection.stop(); connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != session) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
package com.xkt.test; import org.junit.Test; import com.xkt.publish.MyPublisher; import com.xkt.subscriber.MySubscriber; /** * @author lzx * */ public class TestMQ { public static void main(String[] args) { MySubscriber sub = new MySubscriber(); Thread t1 = new Thread(sub); Thread t2 = new Thread(sub); t1.start(); t2.start(); } @Test public void publish() { MyPublisher publisher = new MyPublisher(); publisher.publish("hello,欢迎收听FM 89.9频道-交通频道"); } }
2.2.6 查看测试结果安全
版权说明:欢迎以任何方式进行转载,但请在转载后注明出处!服务器