MQ学习系列:html
activeMQ启动错误 BeanFactory not initializedjava
在session接口中定义的几个常量:apache
消息消费端在建立Session对象时须要指定应答模式为客户端手动应答,当消费者获取到消息并成功处理后须要调用message.acknowledge()方法进行应答,通知Broker消费成功。若是处理过程当中出现异常,须要调用session.recover()通知Broker重复消息,默认最多重复6次。windows
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-client --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.8</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies>
package org.newmean; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; import javax.jms.*; public class ActiveMQTest { //消息发送方-producter @Test public void test1() throws JMSException { //建立链接工厂对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); //从工厂中获取一个链接对象 Connection connection = connectionFactory.createConnection(); //链接MQ服务 connection.start(); //获取session对象 //参数说明 b 是否使用事务 i jms消息确认机制 1 2 3 0 用常量表示 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //经过session建立Topic Topic topic = session.createTopic("TestTopic"); //经过session建立消息发送者 MessageProducer producer = session.createProducer(topic); //经过session建立消息对象 TextMessage message = session.createTextMessage("hello"); //发送消息 producer.send(message); //关闭资源 producer.close(); session.close(); connection.close(); } //消息接收方-consumer @Test public void test2() throws JMSException { //建立链接工厂对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); //从工厂中获取一个链接对象 Connection connection = connectionFactory.createConnection(); //链接MQ服务 connection.start(); //获取session对象 //参数说明 b 是否使用事务 i jms消息确认机制 1 2 3 0 用常量表示 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //经过session建立Topic Topic topic = session.createTopic("TestTopic"); //经过session建立消费者 MessageConsumer consumer = session.createConsumer(topic); //指定消息监听器 consumer.setMessageListener(new MessageListener() { //当咱们监听的topic中存在消息,onMessage这个方法就会自动运行 public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消费者接收到了消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //由于要接收消息不能关闭,同时线程不能死掉 while (true){ } } }
先启动test2方法发起订阅“TestTopic”消息,而后启动test1方法,这时消费者收到了消息。session
消息重发模拟maven
咱们只须要更消息接收方的代码,改动以下:tcp
//消息接收方-consumer @Test public void test2() throws JMSException { //建立链接工厂对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); //从工厂中获取一个链接对象 Connection connection = connectionFactory.createConnection(); //链接MQ服务 connection.start(); //获取session对象 //参数说明 b 是否使用事务 i jms消息确认机制 1 2 3 0 用常量表示 final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //经过session建立Topic Topic topic = session.createTopic("TestTopic"); //经过session建立消费者 MessageConsumer consumer = session.createConsumer(topic); //指定消息监听器 consumer.setMessageListener(new MessageListener() { //当咱们监听的topic中存在消息,onMessage这个方法就会自动运行 public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { if(textMessage.getText().equals("nihao")){ System.out.println("消费者接收到了消息:"+textMessage.getText()); message.acknowledge(); }else { System.out.println("消息处理失败了.."); session.recover(); } } catch (JMSException e) { e.printStackTrace(); } } }); //由于要接收消息不能关闭,同时线程不能死掉 while (true){ } }
先启动test2方法发起订阅“TestTopic”消息,而后启动test1方法,这时消费者就会调用session.recover()方法让消息发布者重发消息默认6次,咱们可以看到7条(第一次+重发六次)“消息处理失败了..”输出。ide