根据地址http://activemq.apache.org/download.html 找到activeMQ的下载地址,下载相应的版本html
1.win安装activeMQjava
打开下载文件下的bin文件找到对应电脑版本(32位或是64位)下的的启动程序web
其中有两个启动程序spring
第一个是以DOS命令行的形式双击启动apache
当看到浏览器访问地址时即为启动成功,须要注意的是在整个MQ的运行中,该命令窗口是不能被关闭的.浏览器
若是以这种形式的启动方式不方便的话可使用第二个启动程序,以服务的形式启动activeMQ,session
双击InstallService.bat文件,而后在服务中启动便可app
无论以哪一种方式启动,成功后都应该在浏览器中访问127.0.0.1:8161成功访问,其中ip位实际机器IP,8161为activeMQ web管理端口.,在后续的使用中将会用到activeMQ的默认端口61616,这些在conf配置中均可修改maven
值得注意的是 ,若是在启动过程当中在端口没有没占用状况下任启动失败 ,则启动bat文件时右键以管理员身份运行.tcp
2.Liunx 安装activeMQ
在官网上 找到Liunx的安装版本 ,也能够复制地址,在Liunx使用wget 在线下载,固然这是在Liunx能链接外网的状况下.在这里我使用官网提供的地址http://www.apache.org/dyn/closer.cgi?filename=/activemq/5.15.0/apache-activemq-5.15.0-bin.tar.gz&action=download
1.cd /opt 将tar.gz文件上传到这里
2.解压缩 tar -zxvf apache-activemq-5.15.0-bin.tar.gz
3.启动:
cd /opt/apache-activemq-5.15.0/bin
sudo ./activemq start
4.端口开放
须要开放8161(web管理页面端口)、61616(activemq服务监控端口) 两个端口
firewall-cmd –zone=public –add-port=8161/tcp –permanent
firewall-cmd –zone=public –add-port=61616/tcp –permanent
firewall-cmd –reload
5.打开web管理页面
http://IP:8161/admin
一样的以这种方式启动,终端也是不能关闭的.
对于JMS提供的API在上一章节已经了解,在这里以简单的java project 来编写JMS的HelloWord
Queue:
1.建立生产者:
package com.jms.produce; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class QueueProducer { private static final String url="tcp://localhost:61616";//activeMQ地址 private static final String queueName="QUEUE_TEST";//队列名称 private static ConnectionFactory connectionFactory;//链接工厂 private static Connection connection;//链接 private static Session session ;//链接会话 private static Destination destination;//目的地 private static MessageProducer producer;//消息提供者--生产者 public static void main(String[] args) throws JMSException{ try { //建立链接工厂 connectionFactory=new ActiveMQConnectionFactory(url); //建立链接 connection=connectionFactory.createConnection(); //启动链接 session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立队列(目的地) destination=session.createQueue(queueName); //建立生成者 producer=session.createProducer(destination); for (int i = 0; i < 10; i++) { TextMessage message=session.createTextMessage("test" + i); producer.send(message); System.out.println("发送消息:"+message.getText()); } }catch (JMSException e){ e.printStackTrace(); }finally {//关闭链接 if (connection != null) try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
2.建立消费者:
package com.jms.consumer; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class QueueConsumer { private static final String url="tcp://localhost:61616";//activeMQ地址 private static final String queueName="QUEUE_TEST";//队列名称 private static ConnectionFactory connectionFactory ;//链接工厂 private static Session session;//链接会话 private static Destination destination ;//目的地 private static MessageConsumer messageConsumer ;//消息消费者--消费者 public static void main(String[] args) { try { //建立链接工厂 connectionFactory = new ActiveMQConnectionFactory(url); //建立链接 Connection connection = connectionFactory.createConnection(); //启动链接 connection.start(); //建立session会话 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立队列(目的地) destination=session.createQueue(queueName); //建立一个消费者 messageConsumer=session.createConsumer(destination); //消息监听器 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); }catch (JMSException e){ e.printStackTrace(); } } }
生产者和消费者queueName应该保持一致,不然介绍不到消息提供者发送的消息.应该尽可能保证先启动Consumer.(若是先启动Producer,则消息会被第一个启动的Consumer所有消费掉)
能够看到消息已经发送到了队列"QUEUE_TEST",能够在web中查看队列的.当建立多个消费者监听同一个队列时,多个消费者平均消费队列中的消息.
Topic:
主题模式的代码与队列模式大同小异,只是在建立目的地是有所不一样,其它的工厂对象,会话等一致.在此不赘述.
//建立主题(目的地) destination=session.createTopic(topicName);
//建立队列(目的地) destination=session.createQueue(queueName);
使用java提供的JMS规范APi做为了解jms的基础知识,在此基础上使用Spring整合Jms提升编码效率.
针对冗长和重复的JMS代码,Spring给出的解决方案市JmsTemplate.JmsTemplate能够建立链接,得到会话以及发送和接收消息.这时的咱们能够专一与构建要发送的消息或者处理接收到的消息. 同时,也可出对抛出的异常作分析而非统一的抛出JMSException.
1.首先导入spring,activeMQ所须要的Jar包.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.chuyu</groupId> <artifactId>jms-test</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.2</version> <exclusions> <exclusion> <!-- 移除-spring-context --> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-spring --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-spring</artifactId> <version>5.14.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework/spring-context --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.11.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.11.RELEASE</version> </dependency> </dependencies> </project>
2.建立xml配置文件
commApp.xml(生产者和消费者共用的配置)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:annotation-config/> <!-- ActiveMq 提供的链接工厂 默认端口为61616 --> <bean id="targetConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> <!--spring Jms 提供的链接池 --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!-- 目的地类型及名称--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="QUEUE.TEST"/> </bean> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="TOPIC.TEST"/> </bean> <!-- Jms模板--> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> </beans>
3.编写发送消息的接口服务
SendMessageImpl
package com.jms.services.servicesImpl; import com.jms.services.SendDataServices; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.jms.*; public class SendMessageImpl implements SendDataServices { @Autowired private JmsTemplate jmsTemplate; @Resource(name = "topicDestination") private Destination destination; //@Resource(name = "queueDestination") //private Destination destination; @Override public void sendMessge(final String message) { jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage= session.createTextMessage(message); return textMessage; } }); } }
3.配置生产者
producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd"> <import resource="commApp.xml"/> <bean id="sendMessageImpl" class="com.jms.services.servicesImpl.SendMessageImpl"/> </beans>
至今生产者已配置完.
配置消费者
consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> <import resource="commApp.xml"/> <bean id="receiveDataService" class="com.jms.services.servicesImpl.ReceiveDataServiceImpl"> <!--在此能够配置spring的依赖--> </bean> <!-- 消息监听实现方法 一 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="topicDestination" /> <property name="messageListener" ref="pureMDPAdapter"/> <!-- <property name="messageListener" ref="consumerListener"/>--> </bean> <!--实现MessageListener 方法的消息监听器 --> <!--<bean id="consumerListener" class="com.jms.spring_jms.ConsumerListeer"/>--> <!-- 消息监听器 MDB(message driver bean 消息驱动bean)--> <bean id="pureMDPAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate" ref="receiveDataService"/> <property name="defaultListenerMethod" value="receiveData"/> </bean> </beans>
在配置消息监听的时候.有两种方法,一种是实现MessageListener,而另外一种则是配置基于消息驱动的监听器(MDB),
针对两种不一样实现方法
package com.jms.spring_jms; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class ConsumerListeer implements MessageListener {//实现JMS规范定义的接口 @Override public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收消息到的消息为:"+textMessage.getText()); }catch ( JMSException e){ e.printStackTrace(); } } }
package com.jms.services.servicesImpl; import com.jms.services.ReciveDataServices; public class ReceiveDataServiceImpl implements ReciveDataServices{ @Override public void receiveData(String message) {//这里定义的接收对象类型需与发送的对象类型相匹配 System.out.println("接收:"+message); } }
生产者,和消费者以及监听已经配置完毕,
编写测试类
package com.jms.spring_jms.producer; import com.jms.services.SendDataServices; import org.apache.xbean.spring.context.ClassPathXmlApplicationContext; public class QueueProducer { private static SendDataServices sendMessageImpl; public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext=new ClassPathXmlApplicationContext("producer.xml"); sendMessageImpl=(SendDataServices) applicationContext.getBean("sendMessageImpl"); for (int i = 0; i <10 ; i++) { sendMessageImpl.sendMessge("发送消息:" + i); System.out.println("发送消息:"+i); } // applicationContext.close(); } }
package com.jms.spring_jms.consumer; import org.apache.xbean.spring.context.ClassPathXmlApplicationContext; import org.springframework.context.support.AbstractApplicationContext; public class QueueReceive { public static void main(String[] args) { AbstractApplicationContext applicationContext=new ClassPathXmlApplicationContext("consumer.xml"); } }
对应队列模式的启动顺序没有要求,主题模式须要先启动消费者,即须要先订阅消息才能接收到消息.能够启动多个消费者查看消息的消费状况,也能够访问浏览器查看生产者,消费者,以及链接的状况.
对于Sping整合JMS的简单应用到此结束.因为篇幅及时间有限,对于配置中的MessageListener接口的实现类没有作解释,能够参考http://elim.iteye.com/blog/1893676这篇文章作进一步了解.
后续文章,将了解Spring整合Jms中,对于事务的管理,以及activeMQ的集群与分布式部署状况下的分布式事务解决方案.
---------------over.