ActiveMQ是Apache出品的,很是流行的消息中间件,能够说要掌握消息中间件,须要从ActiveMQ开始。首先去官网下载:ActiveMQ官网 html
1.1,ActiveMQ目录 java
binweb |
存放的是ActiveMQ的启动脚本activemq.bat。apache |
conf浏览器 |
里面是配置文件,重点关注的是activemq.xml、jetty.xml、jetty-realm.properties。在登陆ActiveMQ Web控制台须要用户名、密码信息、在JMS CLIENT和ActiveMQ进行何种协议的链接、端口等这些信息都在上面的配置文件中能够体现。session |
data并发 |
目录下是ActiveMQ进行消息持久化存放的地方,默认采用的是kahadb,固然咱们能够采用leveldb,或者采用JDBC存储到MySQL,或者干脆不使用持久化机制。app |
webappswebapp |
注意ActiveMQ自带Jetty提供Web管控台。ide |
lib |
中ActiveMQ为咱们提供了分功能的JAR包。 |
1.2,ActiveMQ启动
CMD进入ActiveMQ所在的bin目录,输入ActiveMQ start便可启动ActiveMQ,这里必须已经安装JDK,须要注意JDK版本号。
浏览器输入localhost:8161便可访问ActiveMQ web后台管理
生产的消息查看方式:
Number Of Pending Messages |
还有多少条消息没有被消费,其实是表示消息的积压程度,就是P-C |
Number Of Consumers |
在该队列上还有多少消费者在等待接受消息 |
Messages Dequeued |
消费了多少条消息,记作C |
Messages Enqueued |
生产了多少条消息,记作P |
ActiveMQ 各个版本所依赖的JDK版本:
能够经过查看文件 activemq-all-*.jar包里面的\META-INF\MANIFEST.MF 属性值 Build-Jdk
1.3,ActiveMQ配置文件
jetty.xml |
Jetty web容器的配置文件,里面能够修改ActiveMQ web后台的端口号。 |
jetty-realm.properties |
这里保存了ActiveMQ web控制台的用户名、密码。 |
activemq.xml |
内容太多,单独说。 |
pom.xml中添加ActiveMQ依赖:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.5</version> </dependency> |
ActiveMQ接受/发送消息流程图:
2.1,ActiveMQLinkUtil帮助类:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ActiveMQLinkUtil { private static final Logger logger = LoggerFactory.getLogger(ActiveMQLinkUtil.class); //ActiveMQ 的默认用户名 private static String USERNAME = ActiveMQConnection.DEFAULT_USER; //ActiveMQ 的默认登陆密码 private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //ActiveMQ 的连接地址 private static String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; //连接工厂对象 private static ConnectionFactory connectionFactory = null; //连接对象 private static Connection connection = null; //会话对象 private static Session session = null; /** * * @方法名: initConnection * @描述: 初始化Connection,并返回Session对象,默认开启事务和自动签收 * @return Session对象 */ public static Session initConnection(){ try { String parameters = String.format("链接参数:userName=%s, pwd=%s, url=%s", USERNAME, PASSWORD, BROKEN_URL); logger.info(parameters); //第一步:建立一个连接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); //第二步:从工厂中建立一个连接 connection = connectionFactory.createConnection(); //开启连接(默认是关闭的) connection.start(); //第三步:建立session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); logger.info("Init Connection success"); } catch (JMSException e) { e.printStackTrace(); } return session; } /** * * @方法名: initConnection * @描述: 初始化Connection * @param transactionState 是否开启事务 * @param model 签收模式 * @return Session对象 */ public static Session initConnection(Boolean transactionState, int model){ try { String parameters = String.format("链接参数:userName=%s, pwd=%s, url=%s", USERNAME, PASSWORD, BROKEN_URL); logger.info(parameters); connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(transactionState, model); logger.info("Init Connection success"); } catch (JMSException e) { e.printStackTrace(); } return session; } /** * * @方法名: updateLinkParameter * @描述: 修改默认ActiveMQ的链接属性 * @param userName * @param pwd * @param url */ public static void updateLinkParameter(String userName, String pwd , String url){ if(!StringUtils.isBlank(userName)){ USERNAME = userName; } if(!StringUtils.isBlank(pwd)){ PASSWORD = pwd; } if(!StringUtils.isBlank(url)){ BROKEN_URL = url; } } /** * * @方法名: closeConnection * @描述: 关闭链接 */ public static void closeConnection(){ if(connection != null){ try { connection.close(); logger.info("close Connection success"); } catch (JMSException e) { e.printStackTrace(); } } } }
2.2,建立生产者
package com.zender.activemq; import javax.jms.DeliveryMode; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; /** * * @类名称:ActiveMQProducter * @类描述:生产者 */ public class ActiveMQProducter { //会话对象 private static Session session = null; public static void sendMessages(String name){ try { session = ActiveMQLinkUtil.initConnection(); if(session != null){ //第四步:建立一个消息目标,在PTP模式下是Queue,pub/sub模式下是topic Queue queue = session.createQueue(name); //第五步:建立一个消息生产者 MessageProducer messageProducer = session.createProducer(queue); //第六步:设置持久化方式,默认设置为PERSISTENT(持久性) messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 0; i < 5; i++) { Thread.sleep(1000); //第七步:建立文本消息,并发送 TextMessage textMessage = session.createTextMessage("消息内容" + (i + 1 )); System.out.println("消息内容:" + textMessage.getText()); //使用生产者,发送消息 messageProducer.send(textMessage); //提交事务 session.commit(); } } } catch (Exception e) { e.printStackTrace(); } finally { //第八步:释放链接 ActiveMQLinkUtil.closeConnection(); } } public static void main(String[] args) { ActiveMQProducter.sendMessages("Zender-MQ"); } }
2.3,建立消费者
import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; /** * * @类名称:ActiveMQComsumer * @类描述:消费者 */ public class ActiveMQComsumer { //会话对象 private static Session session = null; public static void getMessages(String name){ try { session = ActiveMQLinkUtil.initConnection(); if(session != null){ //建立一个消息队列 Queue queue = session.createQueue(name); //建立一个消息消费者 MessageConsumer messageConsumer = session.createConsumer(queue); for (int i = 0; i < 5; i++) { Thread.sleep(1000); //获取消息 TextMessage msg = (TextMessage) messageConsumer.receive(); //确认接到消息,这条代码能够不写,上面设置了自动消息确认 //msg.acknowledge(); System.out.println("消息内容:" + msg.getText()); } } } catch (Exception e) { e.printStackTrace(); } finally { ActiveMQLinkUtil.closeConnection(); } } public static void main(String[] args) { ActiveMQComsumer.getMessages("Zender-MQ"); } }
2.4,运行,查看结果
先运行生产者,再运行消费者。
生产者日志:
消费者日志:
在经过Connection建立Session的时候,须要设置2个参数,一个是否支持事务,另外一个是签收的模式。重点说一下签收模式,ActiveMQ支持一下三种模式:
AUTO_ACKNOWLEDGE |
表示在消费者receive消息的时候自动的签收 |
CLIENT_ACKNOWLEDGE |
表示消费者receive消息后必须手动的调用acknowledge()方法进行签收 |
DUPS_OK_ACKNOWLEDGE |
签不签收无所谓了,只要消费者可以容忍重复的消息接受,固然这样会下降Session的开销 |
在实际中,咱们应该采用CLIENT_ACKNOWLEDGE这种签收模式,采用手动的方式较自动的方式可能更好些,由于接收到了消息,并不意味着成功的处理了消息,假设咱们采用手动签收的方式,只有在消息成功处理的前提下才进行签收,那么只要消息处理失败,那么消息还有效,仍然会继续消费,直至成功处理。
JMS Selectors,即消息选择器,前面介绍过消息的组成部分,其中谈到消息对象有消息属性,用于消息选择器。咱们来看一段代码:
生产者:
package com.zender.activemq.selectors; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import com.zender.activemq.ActiveMQLinkUtil; /** * * @类名称:ActiveMQSelectorsProducter * @类描述:生产者-JSM选择器-消息选择器 */ public class ActiveMQSelectorsProducter { //会话对象 private static Session session = null; public static void sendMessages(String name){ try { session = ActiveMQLinkUtil.initConnection(false, Session.CLIENT_ACKNOWLEDGE); if(session != null){ //建立一个消息目标 Queue queue = session.createQueue(name); //建立一个消息生产者 MessageProducer messageProducer = session.createProducer(queue); for (int i = 0; i < 10; i++) { //建立消息,并发送 TextMessage textMessage = session.createTextMessage("消息A-"+i); textMessage.setStringProperty("JMSXtestId", "a"); //使用生产者,发送消息 messageProducer.send(textMessage); TextMessage textMessage1 = session.createTextMessage("消息B-"+i); textMessage1.setStringProperty("JMSXtestId", "b"); messageProducer.send(textMessage1); } } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { ActiveMQSelectorsProducter.sendMessages("SelectorsDemo"); } }
消费者:
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import com.zender.activemq.ActiveMQLinkUtil; /** * * @类名称:ActiveMQSelectorsComsumer * @类描述:消费者-JSM选择器-消息选择器 */ public class ActiveMQSelectorsComsumer { //会话对象 private static Session session = null; @SuppressWarnings("unchecked") public static void getMessages(String name){ try { session = ActiveMQLinkUtil.initConnection(false, Session.CLIENT_ACKNOWLEDGE); if(session != null){ //建立一个消息队列 Queue queue = session.createQueue(name); //建立一个消息消费者 MessageConsumer messageConsumer = session.createConsumer(queue, "JMSXtestId='a'"); //获取消息 messageConsumer.setMessageListener(new MessageListener(){ @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); //签收 textMessage.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } }); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { ActiveMQSelectorsComsumer.getMessages("SelectorsDemo"); } }
结果:
一个生产了20个消息,这边只消费了消息A的消息,消息B的并无被消费。