初识ActiveMQ及整合springboot

消息中间件的初步认识

什么是消息中间件?java

  消息中间件是利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。经过提供消息传递和消息排队模型,能够在分布式架构下扩展进程之间的通讯。web

消息中间件能作什么?spring

  消息中间件主要解决的就是分布式系统之间消息传递的问题,它可以屏蔽各类平台以及协议之间的特性,实现应用程序之间的协同。举个很是简单的例子,就拿一个电商平台的注册功能来简单分析下,用户注册这一个服务,不仅仅只是 insert 一条数据到数据库里面就完事了,还须要发送激活邮件、发送新人红包或者积分、发送营销短信等一系列操做。假如说这里面的每个操做,都须要消耗 1s,那么整个注册过程就须要耗时 4s 才能响应给用户。数据库

ActiveMQ 简介

ActiveMQ 是彻底基于 JMS 规范实现的一个消息中间件产品。是 Apache 开源基金会研发的消息中间件。ActiveMQ主要应用在分布式系统架构中,帮助构建高可用、高性能、可伸缩的企业级面向消息服务的系统ActiveMQ 特性apache

1. 多语言和协议编写客户端json

  语言:java/C/C++/C#/Ruby/Perl/Python/PHPspringboot

  应用协议 :session

  openwire/stomp/REST/ws/notification/XMPP/AMQP架构

2. 彻底支持 jms1.1 和 J2ee1.4 规范app

3. 对 spring 的支持,ActiveMQ 能够很容易内嵌到 spring模块中

ActiveMQ 安装

1. 登陆到 http://activemq.apache.org/components/classic/download/,找到 ActiveMQ 的下载地址 

 我这里用的是apache-activemq-5.15.10-bin.tar.gz ,jdk是1.8.0_161

2. 直 接 copy 到 服 务 器 上 通 过 tar -zxvf apache-activeMQ.tar.gz
3. 启动运行
  a) 普通启动:到 bin 目录下, sh activemq start
  b) 启 动 并 指 定 日 志 文 件 sh activemq start > /tmp/activemqlog
4. 检查是否已启动
  ActiveMQ默认采用 61616 端口提供 JMS服务,使用 8161端口提供管理控制台服务,执行如下命令能够检查是否成功启动 ActiveMQ 服务
  netstat -an|grep 61616

  能够经过./activemq console来查看日志。
5. 经过 http://192.168.11.156:8161 访问 activeMQ 管理页面 ,默认账号密码 admin/admin
6. 关闭 ActiveMQ; sh activemq stop

下面来看一下ActiveMQ的简单应用:

  消息的发布:

public static void main(String[] args) {
		
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.254.135:61616");
		Connection connection = null;
		try {

			connection = connectionFactory.createConnection();
			connection.start();
			// 延迟确认
			Session session = connection.createSession(Boolean.TRUE, Session.DUPS_OK_ACKNOWLEDGE);
			// 建立目的地
			Destination destination = session.createQueue("myQueue");
			// 建立消费者
			MessageProducer producer = session.createProducer(destination);
			TextMessage message = session.createTextMessage("Hello World");
			producer.send(message);
			// 表示消息被自动确认
			session.commit();
			session.close();
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

  对应的客户端消费:

public static void main(String[] args) {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.254.135:61616");
		Connection connection = null;
		try {

			connection = connectionFactory.createConnection();
			connection.start();
			// 延迟确认
			Session session = connection.createSession(Boolean.TRUE, Session.DUPS_OK_ACKNOWLEDGE);
			// 建立目的地
			Destination destination = session.createQueue("myQueue");
			// 建立消费者
			MessageConsumer consumer = session.createConsumer(destination);
			TextMessage textMessage = (TextMessage) consumer.receive();
			System.out.println(textMessage.getText());
			// 表示消息被自动确认
			session.commit();
			session.close();
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

  若是须要作到消息监听的话:

public static void main(String[] args) { ConnectionFactory connectionFactory=
                new ActiveMQConnectionFactory ("tcp://192.168.1.101:61616"); Connection connection=null; try { connection=connectionFactory.createConnection(); connection.start(); Session session=connection.createSession (Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); //建立目的地
            Destination destination=session.createQueue("myQueue"); //建立发送者
            MessageConsumer consumer=session.createConsumer(destination); MessageListener messageListener=new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println(((TextMessage)message).getText()); session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }; consumer.setMessageListener(messageListener); System.in.read(); } catch (JMSException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if(connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }

  基于订阅发布的消息发送:

public static void main(String[] args) {
        ConnectionFactory connectionFactory=
                new ActiveMQConnectionFactory
                        ("tcp://192.168.254.135:61616");
        Connection connection=null;
        try {
            connection=connectionFactory.createConnection();
            connection.start();
            Session session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
            //建立目的地
            Destination destination=session.createTopic("myTopic");
            //建立发送者
            MessageProducer producer=session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //建立须要发送的消息
            TextMessage message=session.createTextMessage("topic -message");
            //Text   Map  Bytes  Stream  Object
            producer.send(message);
            session.commit();
//            session.rollback();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

  基于订阅发布的消息消费:这里须要先启动消费者

public static void main(String[] args) {
        ConnectionFactory connectionFactory=
                new ActiveMQConnectionFactory
                        ("tcp://192.168.254.135:61616");
        Connection connection=null;
        try {
            connection=connectionFactory.createConnection();
            connection.setClientID("wuzz");
            connection.start();
            Session session=connection.createSession
                    (Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
            //建立目的地
            Topic destination=session.createTopic("myTopic");
            //建立发送者
            MessageConsumer consumer=session.createDurableSubscriber(destination,"wuzz");
            TextMessage textMessage=(TextMessage) consumer.receive();
            System.out.println(textMessage.getText());
            session.commit(); //消息被确认
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

  明白了ActiveMQ的基本使用,下面从源码的层面去学习一下ActIiveMQ的原理

springboot整合ActiveMQ:

1.pom.xml

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions><!-- 去掉springboot默认配置 -->
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency> <!-- 引入log4j2依赖 -->
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.0</version>
        </dependency>
    </dependencies>

2.application.yml:

server: port: 8881 spring: activemq: broker-url: tcp://192.168.1.101:61616
 user: admin password: admin pool: enabled: true packages: trust-all: true # 若是使用ObjectMessage传输对象,必需要加上这个信任包,不然会报ClassNotFound异常 jms: pub-sub-domain: true  # 启动主题消息

3.ActiveMqConfig 配置类:

/** * User: Wuzhenzhao * Date: 2019/12/9 * Time: 17:05 * Description: * ClassPath:com.wuzz.demo.integratedway1.config.ActiveMqConfig */ @Configuration public class ActiveMqConfig { // queue模式的ListenerContainer
 @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(activeMQConnectionFactory); return bean; } // topic模式的ListenerContainer
 @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(activeMQConnectionFactory); return bean; } }

4. MqProducer 生产者:

@Service public class MqProducer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; /** * 发送字符串消息队列 * * @param queueName 队列名称 * @param message 字符串 */
    public void sendStringQueue(String queueName, String message) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message); } /** * 发送字符串集合消息队列 * * @param queueName 队列名称 * @param list 字符串集合 */
    public void sendStringListQueue(String queueName, List<String> list) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), list); } /** * 发送对象消息队列 * * @param queueName 队列名称 * @param obj 对象 */
    public void sendObjQueue(String queueName, Serializable obj) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), obj); } /** * 发送对象集合消息队列 * * @param queueName 队列名称 * @param objList 对象集合 */
    public void sendObjListQueue(String queueName, List<Serializable> objList) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), objList); } /** * 发送字符串消息主题 * * @param topicName 主题名称 * @param message 字符串 */
    public void sendStringTopic(String topicName, String message) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), message); } /** * 发送字符串集合消息主题 * * @param topicName 主题名称 * @param list 字符串集合 */
    public void sendStringListTopic(String topicName, List<String> list) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), list); } /** * 发送对象消息主题 * * @param topicName 主题名称 * @param obj 对象 */
    public void sendObjTopic(String topicName, Serializable obj) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), obj); } /** * 发送对象集合消息主题 * * @param topicName 主题名称 * @param objList 对象集合 */
    public void sendObjListTopic(String topicName, List<Serializable> objList) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), objList); } }

4.队列消费者 QueueConsumer:

@Component public class QueueConsumer { @JmsListener(destination = "stringQueue", containerFactory = "jmsListenerContainerQueue") public void receiveStringQueue(String msg) { System.out.println("接收到消息...." + msg); } // @JmsListener(destination = "stringListQueue", containerFactory = "jmsListenerContainerQueue") // public void receiveStringListQueue(List<String> list) { // System.out.println("接收到集合队列消息...." + list); // } //
//
// @JmsListener(destination = "objQueue", containerFactory = "jmsListenerContainerQueue") // public void receiveObjQueue(ObjectMessage objectMessage) throws Exception { // System.out.println("接收到对象队列消息...." + objectMessage.getObject()); // } //
//
// @JmsListener(destination = "objListQueue", containerFactory = "jmsListenerContainerQueue") // public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception { // System.out.println("接收到的对象队列消息..." + objectMessage.getObject()); // }
 }

5.主题消费者A ,这里为了测试topic消息,咱们使用两个消费者去订阅。ATopicConsumer:

@Component public class ATopicConsumer { @JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic") public void receiveStringTopic(String msg) { System.out.println("ATopicConsumer接收到消息...." + msg); } // @JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic") // public void receiveStringListTopic(List<String> list) { // System.out.println("ATopicConsumer接收到集合主题消息...." + list); // } //
//
// @JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic") // public void receiveObjTopic(ObjectMessage objectMessage) throws Exception { // System.out.println("ATopicConsumer接收到对象主题消息...." + objectMessage.getObject()); // } //
//
// @JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic") // public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception { // System.out.println("ATopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject()); // }
 }

  BTopicConsumer:

@Component public class BTopicConsumer { @JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic") public void receiveStringTopic(String msg) { System.out.println("BTopicConsumer接收到消息...." + msg); } // @JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic") // public void receiveStringListTopic(List<String> list) { // System.out.println("BTopicConsumer接收到集合主题消息...." + list); // } //
//
// @JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic") // public void receiveObjTopic(ObjectMessage objectMessage) throws Exception { // System.out.println("BTopicConsumer接收到对象主题消息...." + objectMessage.getObject()); // } //
//
// @JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic") // public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception { // System.out.println("BTopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject()); // } }

6.实体类 User:

public class User implements Serializable { private String id; private String name; private Integer age; public User() { } public User(String id, String name, Integer age) { this.id = id; this.name = name; this.age = age; }   //省略get set 跟 toString }

7.测试类:

@RestController public class TestController { @Autowired private MqProducer mqProducer; @RequestMapping(value = "/testStringQueue.json", method = {RequestMethod.GET}) public void testStringQueue() { for (int i = 1; i <= 100; i++) { System.out.println("" + i + "次发送字符串队列消息"); mqProducer.sendStringQueue("stringQueue", "消息:" + i); } } // @RequestMapping(value = "/testStringListQueue.json", method = {RequestMethod.GET}) // public void testStringListQueue() { //
// List<String> idList = new ArrayList<>(); // idList.add("id1"); // idList.add("id2"); // idList.add("id3"); //
// System.out.println("正在发送集合队列消息ing......"); // mqProducer.sendStringListQueue("stringListQueue", idList); // } //
//
// @RequestMapping(value = "/testObjQueue.json", method = {RequestMethod.GET}) // public void testObjQueue() { //
// System.out.println("正在发送对象队列消息......"); // mqProducer.sendObjQueue("objQueue", new User("1", "小明", 20)); // } //
//
// @RequestMapping(value = "/testObjListQueue.json", method = {RequestMethod.GET}) // public void testObjListQueue() { //
// System.out.println("正在发送对象集合队列消息......"); //
// List<Serializable> userList = new ArrayList<>(); // userList.add(new User("1", "小明", 21)); // userList.add(new User("2", "小雪", 22)); // userList.add(new User("3", "小花", 23)); //
// mqProducer.sendObjListQueue("objListQueue", userList); // }
 @RequestMapping(value = "/testStringTopic.json", method = {RequestMethod.GET}) public void testStringTopic() { for (int i = 1; i <= 100; i++) { System.out.println("" + i + "次发送字符串主题消息"); mqProducer.sendStringTopic("stringTopic", "消息:" + i); } } // @RequestMapping(value = "/testStringListTopic.json", method = {RequestMethod.GET}) // public void testStringListTopic() { //
// List<String> idList = new ArrayList<>(); // idList.add("id1"); // idList.add("id2"); // idList.add("id3"); //
// System.out.println("正在发送集合主题消息ing......"); // mqProducer.sendStringListTopic("stringListTopic", idList); // } //
//
// @RequestMapping(value = "/testObjTopic.json", method = {RequestMethod.GET}) // public void testObjTopic() { //
// System.out.println("正在发送对象主题消息......"); // mqProducer.sendObjTopic("objTopic", new User("1", "小明", 20)); // } //
//
// @RequestMapping(value = "/testObjListTopic.json", method = {RequestMethod.GET}) // public void testObjListTopic() { //
// System.out.println("正在发送对象集合主题消息......"); //
// List<Serializable> userList = new ArrayList<>(); // userList.add(new User("1", "小明", 21)); // userList.add(new User("2", "小雪", 22)); // userList.add(new User("3", "小花", 23)); //
// mqProducer.sendObjListTopic("objListTopic", userList); // }
}

  启动后访问对应接口就能够。