原文:https://blog.csdn.net/liuyuanq123/article/details/79109218
ActiveMQ介绍
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
特点:
1、支持多种语言编写客户端
2、对spring的支持,很容易和spring整合
3、支持多种传输协议:TCP,SSL,NIO,UDP等
4、支持AJAX
消息形式:
1、点对点(queue)
2、一对多(topic)
pom中添加依赖:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> </dependency>
queue的发送代码:
public void testMQProducerQueue() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Queue queue = session.createQueue("test-queue"); //6、使用会话对象创建生产者对象 MessageProducer producer = session.createProducer(queue); //7、使用会话对象创建一个消息对象 TextMessage textMessage = session.createTextMessage("hello!test-queue"); //8、发送消息 producer.send(textMessage); //9、关闭资源 producer.close(); session.close(); connection.close(); }
接受的代码:
public void TestMQConsumerQueue() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Queue queue = session.createQueue("test-queue"); //6、使用会话对象创建生产者对象 MessageConsumer consumer = session.createConsumer(queue); //7、向consumer对象中设置一个messageListener对象,用来接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // TODO Auto-generated method stub if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage)message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); //8、程序等待接收用户消息 System.in.read(); //9、关闭资源 consumer.close(); session.close(); connection.close(); }
topic发送代码:
public void TestTopicProducer() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Topic topic = session.createTopic("test-topic"); //6、使用会话对象创建生产者对象 MessageProducer producer = session.createProducer(topic); //7、使用会话对象创建一个消息对象 TextMessage textMessage = session.createTextMessage("hello!test-topic"); //8、发送消息 producer.send(textMessage); //9、关闭资源 producer.close(); session.close(); connection.close(); }
topic接受代码:
public void TestTopicConsumer() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Topic topic = session.createTopic("test-topic"); //6、使用会话对象创建生产者对象 MessageConsumer consumer = session.createConsumer(topic); //7、向consumer对象中设置一个messageListener对象,用来接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // TODO Auto-generated method stub if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage)message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); //8、程序等待接收用户消息 System.in.read(); //9、关闭资源 consumer.close(); session.close(); connection.close(); }
与spring整合,applicationContext-activemq.xml文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 配置能够产生connection的connectionfactory,由JMS对应的服务厂商提供 --> <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <constructor-arg name="brokerURL" value="tcp://192.168.156.44:61616"/> </bean> <!-- 配置spring管理真正connectionfactory的connectionfactory,相当于spring对connectionfactory的一层封装 --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="tagertConnectionFactory"/> </bean> <!-- 配置destination --> <!-- 队列目的地 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="spring-queue"/> </bean> <!-- 话题目的地 --> <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="item-add-topic"/> </bean> <!-- 配置监听器 --> <bean id="myListener" class="com.taotao.search.listener.MyListener"/> <bean id="itemAddListener" class="com.taotao.search.listener.ItemAddListener"/> <!-- 系统监听器 --> <!-- <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="queueDestination"/> <property name="messageListener" ref="myListener"/> </bean> --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="itemAddTopic"/> <property name="messageListener" ref="itemAddListener"/> </bean> </beans>