引入maven依赖 <!-- activemq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>为了便于管理mq这里统一在xml中配置:java
<mq-clients> <producer> <id>demo.test</id> <topic>MQ_TEST</topic> <mq.type>1</mq.type> <delivery.mode>1</delivery.mode> <acknowledge>1</acknowledge> </producer> <producer> <id>demo.test2</id> <topic>MQ_TEST2</topic> <mq.type>2</mq.type> <delivery.mode>1</delivery.mode> <acknowledge>1</acknowledge> </producer> <consumer> <id>demo.consumer.test1</id> <topic>MQ_TEST</topic> <mq.type>1</mq.type> <message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener</message.listener> </consumer> <consumer> <id>demo.consumer.test2</id> <topic>MQ_TEST2</topic> <mq.type>2</mq.type> <message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener2</message.listener> </consumer> </mq-clients>XMLUtil:用来读取xmlnode
/** * 获取全部节点 * @param root 根节点 * @param map 记录每一个节点及值 */ @SuppressWarnings("unchecked") private static void getNode(Element root, LinkedHashMap<String, String> map) { List<Element> list = root.elements(); Iterator<Element> iterator = list.iterator(); while (iterator.hasNext()) { Element element = iterator.next(); if (element.elements() != null && element.elements().size() > 0) { System.out.println("element:"+element.getName()); getNode(element, map); } else { map.put(element.getParent().getName() + "." + element.getName(), element.getTextTrim()); } } } /** * 读XML文件指定节点内容 * @param xmlName xml文件名 * @param nodeName 指定节点 * @return * @throws Exception */ public static Map<String, String> reader(String xmlName,String nodeName)throws Exception{ if(StringUtils.isEmpty(xmlName)){ throw new NullPointerException("xmlName cannot be null!"); } LinkedHashMap<String, String> returnValue = new LinkedHashMap<String, String>(); InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName); SAXReader reader = new SAXReader(); Document document = reader.read(in); Element root = document.getRootElement(); if(StringUtils.isNotEmpty(nodeName)){ root = document.getRootElement().element(nodeName); } //获取节点 getNode(root, returnValue); if (returnValue.size()>0) { for (String key : returnValue.keySet()) { System.out.println("key:" + key + " ,value:" + returnValue.get(key)); } } return returnValue; } /** * 读XML文件全部内容,并将文件转成对象 * @param xmlName 文件名 * @param cls * @return * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T readerXmlToBean(String xmlName ,Class<?>...cls)throws Exception{ if(StringUtils.isEmpty(xmlName)){ throw new NullPointerException("xmlName cannot be null!"); } InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName); JAXBContext context = JAXBContext.newInstance(cls);// 获取上下文对象 Unmarshaller unmarshaller = context.createUnmarshaller(); T t = (T)unmarshaller.unmarshal(in); return t; }Producer:git
@XmlRootElement(name="producer") public class Producer { private String id; // 主题 private String topic; // 类型,1-queue,2-topic private Integer mqType; // 持久化方式 :1-非持久,2-持久化 private Integer deliveryMode; // 签收方式:1-自动签收,2-客户端确认,3-自动批量确认,0-事务提交并确认 private Integer acknowledge; //省略get set }Consumer:spring
@XmlRootElement(name = "consumer") public class Consumer { private String id; private String topic; private Integer mqType; private Class<? extends MessageListener> messageListener; ... }MessageUtil:mq消息集中处理类,包括发送消息,启动消费监听等springboot
private static MqConnectionFactory mqFactory = MqConnectionFactory.INSTANCE; private static Connection conn = null; private static Session session = null; public static void init() { try { // 获取一个链接 if (conn == null) { conn = mqFactory.getConnection(); } conn.start(); // 自动提交事务 if (session == null) { /* * Session.AUTO_ACKNOWLEDGE 消息自动签收 * Session.CLIENT_ACKNOWLEDGE 客戶端调用acknowledge方法手动签收 * Session.DUPS_OK_ACKNOWLEDGE 不是必须签收,消息可能会重复发送。在第二次从新传送消息的时候,消息 * 头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端须要进行消息的重复处理控制。 */ session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); } } catch (Exception e) { e.printStackTrace(); } } /** * * @param obj 序列化对象 * @param topic * @param isQueue * @throws Exception */ public static void sendObjectMessage(Serializable obj, String id) throws Exception { init(); Producer p = getProducerById(id); MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode()); producer.send(session.createObjectMessage(obj)); destroy(producer); } private static Producer getProducerById(String id) { Producer p = MQUtil.getProducerById(id); if (p == null) { throw new NullPointerException("according to id:" + id + ", not found produer."); } return p; } public static void sendTextMessage(String mes, String id) throws Exception { init(); Producer p = getProducerById(id); MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode()); producer.send(session.createTextMessage(mes)); destroy(producer); } private static MessageProducer getMessageProducer(Destination destination, Integer deliveryMode) throws Exception { MessageProducer producer = session.createProducer(destination); /** * PERSISTENT(持久性消息): * 这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。 * 可靠性的另外一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们以前不会丢失这些消息。这意味着在持久性消息传送至目标时, * 消息服务将其放入持久性数据存储。若是消息服务因为某种缘由致使失败, * 它能够恢复此消息并将此消息传送至相应的消费者。虽然这样增长了消息传送的开销,但却增长了可靠性。 * NON_PERSISTENT(非持久性消息): * 保证这些消息最多被传送一次。对于这些消息,可靠性并不是主要的考虑因素。 * 此模式并不要求持久性的数据存储,也不保证消息服务因为某种缘由致使失败后消息不会丢失。 * */ producer.setDeliveryMode(deliveryMode); return producer; } private static Destination getDestination(Producer p) throws Exception { return getDestination(p.getMqType(), p.getTopic()); } private static Destination getDestination(Consumer c) throws Exception { return getDestination(c.getMqType(), c.getTopic()); } private static Destination getDestination(Integer mqType, String topic) throws Exception { Destination destination = null; if (ActiveMqType.QUEUE == mqType) destination = session.createQueue(topic); else if (ActiveMqType.TOPIC == mqType) destination = session.createTopic(topic); else throw new IllegalArgumentException("mqType must be 1 or 2."); return destination; } /** * 启动全部监听 * @param c * @throws Exception */ public static void startConsumer(Consumer c) throws Exception { init(); MessageConsumer consumer = session.createConsumer(MessageUtil.getDestination(c)); MessageListener listener = c.getMessageListener().newInstance(); consumer.setMessageListener(listener); } private static void destroy(MessageProducer producer) throws JMSException { if (producer != null) { producer.close(); } if (session != null) { session.close(); session = null; } if (conn != null) { conn.close(); conn = null; } } public static void destroy(MessageConsumer consumer) throws JMSException { if (consumer != null) { consumer.close(); consumer = null; } if (session != null) { session.close(); session = null; } if (conn != null) { conn.close(); conn = null; } }细节不在赘述,具体代码已上传至码云:https://gitee.com/savage_xiao/boot.demo/tree/mastersession
有兴趣能够下载下来看一下,其中有包含其余springboot的研究maven
测试代码:spring-boot
public static void main(String[] args) { try { for(int i = 101; i<200;i++){ MessageUtil.sendTextMessage("hello world!"+","+(i+1), "demo.test"); } } catch (Exception e) { e.printStackTrace(); } } ----------------------- public static void main(String[] args) { try { for(int i = 0; i<100;i++){ MessageUtil.sendTextMessage("hello world2!"+","+(i+1), "demo.test2"); } } catch (Exception e) { e.printStackTrace(); } }
测试结果:测试
listener2:hello world2!,1 listener2:hello world2!,2 listener2:hello world2!,3 listener2:hello world2!,4 listener2:hello world2!,5 .... ....略 listener2:hello world2!,98 listener2:hello world2!,99 listener2:hello world2!,100 listener:hello world!,102 listener:hello world!,103 ... ...略 listener:hello world!,197 listener:hello world!,198 listener:hello world!,199 listener:hello world!,200ActiveMq如何搭建如何使用请看上一篇:http://www.javashuo.com/article/p-gfwpvgty-dq.html.net