ActiveMQ学习总结------原生实战操做(下)03

本篇将继续延续上一篇的内容,做为知识补充篇,为接下来咱们学习spring整合ActiveMQ打好基础

本篇主要学习内容:html

  1.ActiveMQ 队列服务监听java

  2.ActiveMQ Topic模型spring


 

回顾下上一篇ActiveMQ学习总结咱们学习到了:

  1.ActiveMQ术语及API介绍apache

  2.ActiveMQ 文本消息处理session

  3.ActiveMQ 对象消息处理多线程

相信大如今对ActiveMQ的一些简单操做已经很轻松掌握了tcp

上一篇文章地址:https://www.cnblogs.com/arebirth/p/activemq02.htmlide


 

 

一 ActiveMQ实现队列服务监听

在咱们上一篇的练习中,全部的消费者都是接收一次消息即断开链接,这样是否是很不方便。学习

试想一下,若是咱们的provider在consumer接收完第一条消息后又继续发送了一条消息,那么consumer已经断开链接了,是否是就不能链接不间断的实时获取消息?测试

解决方案:

  很容易,用咱们的队列服务监听便可

 

*:根据上一章的学习,你们对环境搭建使用配置,确定都已经至关清楚了,这里就不过多阐述,直接进行代码实战

 

1 消息生产者

相比之下,我么你的生产者照以前是没有任何变化的,主要的变化仍是在cosumer身上

package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQQueueListenerProducer { public static void sendTextActiveMq(String txt) { //定义连接工厂
        ConnectionFactory connectionFactory = null; //定义连接对象
        Connection connection = null; //定义会话
        Session session = null; //目的地
        Destination destination = null; //定义消息的发送者
        MessageProducer producer = null; //定义消息
        Message message = null; try { //建立连接工厂
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立连接诶对象
            connection = connectionFactory.createConnection(); //启动连接
 connection.start(); //建立会话
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地
            destination = session.createQueue("queue-listener"); //建立消息生产者
            producer = session.createProducer(destination); //建立消息对象
            message = session.createTextMessage(txt); //发送消息
 producer.send(message); } catch (Exception ex) { ex.printStackTrace(); } finally { //回收资源
            if (producer != null) { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }

 

2 消息消费者

package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQQueueListenerConsumer { public static void receiveTextActiveMq() { // 定义连接工厂
        ConnectionFactory connectionFactory = null; // 定义连接对象
        Connection connection = null; // 定义会话
        Session session = null; // 目的地
        Destination destination = null; // 定义消息的发送者
        MessageConsumer consumer = null; // 定义消息
        Message message = null; try { //建立连接工厂
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立连接对象
            connection = connectionFactory.createConnection(); //启动连接
 connection.start(); //建立会话
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地
            destination = session.createQueue("queue-listener"); //建立消息消费者
            consumer = session.createConsumer(destination); //队列服务监听
            consumer.setMessageListener(new MessageListener() { //ActiveMQ回调方法。经过该方法将消息传递到consumer
 @Override public void onMessage(Message message) { //处理消息
                    String msg = null; try { msg = ((TextMessage) message).getText(); } catch (JMSException e) { e.printStackTrace(); } System.out.println("Producer say:" + msg); } }); } catch (Exception ex) { ex.printStackTrace(); } } }

 

3 测试

3.1 provider测试

package cn.arebirth.mq; public class ProducerTest { public static void main(String[] args) { ActiveMQQueueListenerProducer.sendTextActiveMq("Hello,consumer!"); } }

观察咱们的控制台能够发现已经成功发布到队列

 

 

 

3.2 consumer测试

package cn.arebirth.mq; public class ConsumerTest { public static void main(String[] args) { ActiveMQQueueListenerConsumer.receiveTextActiveMq(); } }

咱们运行后能够发现,它接收到了消息,可是它的进程并无关闭,

 

 

咱们用provider继续发布一条消息,看看consumer能不能接收到

 

 

能够看到,consumer持续在后台监听咱们发布的消息,

 

 

 

 

 

 

经过上面代码,不难发现,provider没有任何改动,只是consumer修改了一部分

经过调用匿名内部类的方法来实现持续监听

consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { } }

注意:由于涉及到队列持续监听,因此咱们不能在finally处给资源回收,不然还在监听状态,资源都回收没了,也就无从监听啦。

 


 

 

二 Topic模型

在本系列文章第一篇也有介绍过一些Topic模型的概念,那么这里咱们将以原理+实战的方式来带领你们掌握

 

1 Publish/Subscribe处理模式(Topic)

消息生产者(发布)消息到topic中,同时有多个消息消费者(订阅)消费该消息。

和点对点方式不一样,发布到Topic的消息会被全部的订阅者消费,而点对点的只能是指定的消费者去消费

当生产者发布消息,无论是否有消费者,都不会保存消息,也就是说它是发完就啥也无论了那种,

因此要注意:必定要有消费者,而后在有生产者,不然生产者不发完消息什么也无论了,你消费者在生产者以后才有,那么你是接收不到消息的。

 

接下来咱们就以实战的方式鼓捣下。

 

2 建立生产者

package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQTopicProducer { public static void sendTextActiveMQ(String txt){ //定义连接工厂
        ConnectionFactory connectionFactory = null; //定义连接对象
        Connection connection = null; //定义会话
        Session session = null; //目的地
        Destination destination = null; //定义消息的发送者
        MessageProducer producer = null; //定义消息
        Message message = null; try { //建立连接工厂
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立连接诶对象
            connection = connectionFactory.createConnection(); //启动连接
 connection.start(); //建立会话
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地
            destination = session.createTopic("topic-test"); //建立消息生产者
            producer = session.createProducer(destination); //建立消息对象
            message = session.createTextMessage(txt); //发送消息
 producer.send(message); } catch (Exception ex) { ex.printStackTrace(); } finally { //回收资源
            if (producer != null) { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }

咱们能够发现,在建立目的地destination的时候代码有了变更

destination = session.createTopic("topic-test");

变成了createTopic,对这就是topic模式了。

 

3 建立消费者

package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQTopicConsumer implements Runnable { public static void receiveTextActiveMQ(String threadName) { // 定义连接工厂
        ConnectionFactory connectionFactory = null; // 定义连接对象
        Connection connection = null; // 定义会话
        Session session = null; // 目的地
        Destination destination = null; // 定义消息的发送者
        MessageConsumer consumer = null; // 定义消息
        Message message = null; try { //建立连接工厂
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立连接对象
            connection = connectionFactory.createConnection(); //启动连接
 connection.start(); //建立会话
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地
            destination = session.createTopic("topic-test"); //建立消息的消费者
            consumer = session.createConsumer(destination); //服务监听
            consumer.setMessageListener(new MessageListener() { //ActiveMQ回调方法。经过该方法将消息传递到consumer
 @Override public void onMessage(Message message) { //处理消息
                    String msg = null; try { msg = ((TextMessage) message).getText(); } catch (JMSException e) { e.printStackTrace(); } System.out.println(threadName + "--Producer say:" + msg); } }); } catch (Exception ex) { ex.printStackTrace(); } } @Override public void run() { receiveTextActiveMQ(Thread.currentThread().getName()); } }

 

咱们能够发现,在建立目的地destination的时候代码有了变更

destination = session.createTopic("topic-test");

还有实现了Runnable这个是为了一会测试的时候,多线程启动,看效果,是否多个都会接受到,(若是看着糊涂的话,你也能够去掉线程的部分,单独复制多个对象,并启动,效果也是同样的)

 

4 测试(要先启动消费者,不然消费者是接收不到消息的!固然,你本身能够试一下

4.1 测试消费者

package cn.arebirth.mq; public class ConsumerTest { public static void main(String[] args) { ActiveMQTopicConsumer a1 = new ActiveMQTopicConsumer(); Thread t1 = new Thread(a1,"a1"); ActiveMQTopicConsumer a2 = new ActiveMQTopicConsumer(); Thread t2 = new Thread(a2,"a2"); ActiveMQTopicConsumer a3 = new ActiveMQTopicConsumer(); Thread t3 = new Thread(a3,"a3"); t1.start(); t2.start(); t3.start(); } }

 

能够看到,咱们的消费者已经启动了,三个线程。并以监听服务的方式启动

 

 

4.2 测试生产者

package cn.arebirth.mq; public class ProducerTest { public static void main(String[] args) { ActiveMQTopicProducer.sendTextActiveMQ("hello,topic"); } }

 

能够看到,在topics下面,咱们发布的内容已经有记录了

 

 

 

而后咱们在看下,咱们的consumer

 

 

 

能够发现,三个consumer都已经接收到了

 

ps:

  若是你对ActiveMQ原理性的东西感到困惑,能够看下咱们前面的文章:https://www.cnblogs.com/arebirth/p/activemq01.html

 

原文出处:https://www.cnblogs.com/arebirth/p/activemq03.html

相关文章
相关标签/搜索