一、消息头java
二、消息体spring
三、消息属性apache
一、TextMessage springboot
二、MapMessage session
三、ObjectMessage tcp
四、BytesMessage ide
五、StreamMessageurl
一、非持久化spa
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);code
非持久化,当mq宕机后消息不存在
二、持久化(消息默认是持久化)
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
持久化,当mq宕机后消息存在
一、非持久化队列
生产者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/2 17:04 * @Version: 1.0 */ public class ActiveMQTest { //url路径 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //队列名称 private static final String QUEUE_NAME="queue01"; //主题名称 private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) { //一、建立链接工厂 //若是帐号密码没有修改的话,帐号密码默认均为admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帐号密码修改的话 //第一个参数为帐号,第二个为密码,第三个为请求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、经过链接工厂获取链接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //三、建立session会话 //里面会有两个参数,第一个为事物,第二个是签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、建立目的地(具体是队列仍是主题),这里是建立队列 Queue queue=session.createQueue(QUEUE_NAME); //五、建立消息生产者,队列模式 MessageProducer messageProducer = session.createProducer(queue); //六、经过messageProducer生产三条消息发送到MQ消息队列中 for (int i=0;i<3;i++){ //七、建立消息 TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一个文本消息 //消息属性 textMessage.setStringProperty("c01","vip"); //八、经过messageProducer发送给mq messageProducer.send(textMessage); //九、数据非持久化 messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } messageProducer.close(); session.close(); connection.close(); System.out.println("消息发送成功"); } catch (JMSException e) { e.printStackTrace(); } } }
消费者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/3 8:47 * @Version: 1.0 */ public class ActiveMQConsumer { //url路径 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //队列名称 private static final String QUEUE_NAME="queue01"; public static void main(String[] args) { //一、建立链接工厂 //若是帐号密码没有修改的话,帐号密码默认均为admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帐号密码修改的话 //第一个参数为帐号,第二个为密码,第三个为请求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、经过链接工厂获取链接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //三、建立session会话 //里面会有两个参数,第一个为事物,第二个是签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、这里接受的queue的名称要和发送者的一致 Queue queue = session.createQueue(QUEUE_NAME); //五、建立消费者 MessageConsumer consumer = session.createConsumer(queue); //六、经过监听的方式消费消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //若是message不等于null而且属于TextMessage类型(由于消息发送的类型是TextMessage,因此这里判断是不是这个类型) if(null!=message&&message instanceof TextMessage){ TextMessage textMessage=(TextMessage)message; try { System.out.println(textMessage.getText()); //获取消息属性 System.out.println("消息属性--->"+textMessage.getStringProperty("c01")); } catch (JMSException e) { e.printStackTrace(); } } } }); //七、保证控制台一直在运行 System.in.read(); //八、闭资源 consumer.close(); session.close(); connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
二、非持久化主题
生产者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/4 9:29 * @Version: 1.0 */ public class ActiveMQTopicTest { //url路径 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //主题名称 private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) { //一、建立链接工厂 //若是帐号密码没有修改的话,帐号密码默认均为admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帐号密码修改的话 //第一个参数为帐号,第二个为密码,第三个为请求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、经过链接工厂获取链接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //三、建立session会话 //里面会有两个参数,第一个为事物,第二个是签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、建立目的地(具体是队列仍是主题),这里是建立主题 Topic topic=session.createTopic(TOPIC_NAME); //五、建立消息生产者,主题模式 MessageProducer messageProducer = session.createProducer(topic); //六、经过messageProducer生产三条消息发送到MQ消息主题中 for (int i=0;i<3;i++){ //七、建立消息 TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一个文本消息 //八、经过messageProducer发送给mq messageProducer.send(textMessage); } messageProducer.close(); session.close(); connection.close(); System.out.println("消息发送成功"); } catch (JMSException e) { e.printStackTrace(); } } }
消费者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/4 9:43 * @Version: 1.0 */ public class ActiveMQTopicConsumer { //url路径 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //主题名称 private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) { //一、建立链接工厂 //若是帐号密码没有修改的话,帐号密码默认均为admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帐号密码修改的话 //第一个参数为帐号,第二个为密码,第三个为请求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、经过链接工厂获取链接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //三、建立session会话 //里面会有两个参数,第一个为事物,第二个是签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、这里接受的queue的名称要和发送者的一致 Topic topic = session.createTopic(TOPIC_NAME); //五、建立消费者 MessageConsumer consumer = session.createConsumer(topic); //六、经过监听的方式消费消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //若是message不等于null而且属于TextMessage类型(由于消息发送的类型是TextMessage,因此这里判断是不是这个类型) if(null!=message&&message instanceof TextMessage){ TextMessage textMessage=(TextMessage)message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); //七、保证控制台一直在运行 System.in.read(); //八、闭资源 consumer.close(); session.close(); connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
三、持久化队列
生产者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/2 17:04 * @Version: 1.0 */ public class ActiveMQTest { //url路径 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //队列名称 private static final String QUEUE_NAME="queue01"; //主题名称 private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) { //一、建立链接工厂 //若是帐号密码没有修改的话,帐号密码默认均为admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帐号密码修改的话 //第一个参数为帐号,第二个为密码,第三个为请求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、经过链接工厂获取链接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //三、建立session会话 //里面会有两个参数,第一个为事物,第二个是签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、建立目的地(具体是队列仍是主题),这里是建立队列 Queue queue=session.createQueue(QUEUE_NAME); //五、建立消息生产者,队列模式 MessageProducer messageProducer = session.createProducer(queue); //六、经过messageProducer生产三条消息发送到MQ消息队列中 for (int i=0;i<3;i++){ //七、建立消息 TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一个文本消息 //消息属性 textMessage.setStringProperty("c01","vip"); //八、经过messageProducer发送给mq messageProducer.send(textMessage); //九、数据持久化 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); } messageProducer.close(); session.close(); connection.close(); System.out.println("消息发送成功"); } catch (JMSException e) { e.printStackTrace(); } } }
消费者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/3 8:47 * @Version: 1.0 */ public class ActiveMQConsumer { //url路径 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //队列名称 private static final String QUEUE_NAME="queue01"; public static void main(String[] args) { //一、建立链接工厂 //若是帐号密码没有修改的话,帐号密码默认均为admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帐号密码修改的话 //第一个参数为帐号,第二个为密码,第三个为请求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、经过链接工厂获取链接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //三、建立session会话 //里面会有两个参数,第一个为事物,第二个是签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、这里接受的queue的名称要和发送者的一致 Queue queue = session.createQueue(QUEUE_NAME); //五、建立消费者 MessageConsumer consumer = session.createConsumer(queue); //六、经过监听的方式消费消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //若是message不等于null而且属于TextMessage类型(由于消息发送的类型是TextMessage,因此这里判断是不是这个类型) if(null!=message&&message instanceof TextMessage){ TextMessage textMessage=(TextMessage)message; try { System.out.println(textMessage.getText()); //获取消息属性 System.out.println("消息属性--->"+textMessage.getStringProperty("c01")); } catch (JMSException e) { e.printStackTrace(); } } } }); //七、保证控制台一直在运行 System.in.read(); //八、闭资源 consumer.close(); session.close(); connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
四、持久化主题
生产者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/4 9:29 * @Version: 1.0 */ public class ActiveMQTopicTest { //url路径 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //主题名称 private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) { //一、建立链接工厂 //若是帐号密码没有修改的话,帐号密码默认均为admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帐号密码修改的话 //第一个参数为帐号,第二个为密码,第三个为请求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、经过链接工厂获取链接 Connection connection = activeMQConnectionFactory.createConnection(); //三、建立session会话 //里面会有两个参数,第一个为事物,第二个是签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、建立目的地(具体是队列仍是主题),这里是建立主题 Topic topic=session.createTopic(TOPIC_NAME); //五、建立消息生产者,主题模式 MessageProducer messageProducer = session.createProducer(topic); //持久化数据 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); //六、经过messageProducer生产三条消息发送到MQ消息主题中 for (int i=0;i<3;i++){ //七、建立消息 TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一个文本消息 //八、经过messageProducer发送给mq messageProducer.send(textMessage); } messageProducer.close(); session.close(); connection.close(); System.out.println("消息发送成功"); } catch (JMSException e) { e.printStackTrace(); } } }
消费者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.bdqn.test * @Author: huat * @Date: 2020/1/4 9:43 * @Version: 1.0 */ public class ActiveMQTopicConsumer { //url路径 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //主题名称 private static final String TOPIC_NAME = "topic01"; public static void main(String[] args) { //一、建立链接工厂 //若是帐号密码没有修改的话,帐号密码默认均为admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帐号密码修改的话 //第一个参数为帐号,第二个为密码,第三个为请求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、经过链接工厂获取链接 Connection connection = activeMQConnectionFactory.createConnection(); //订阅名称 connection.setClientID("test01"); //三、建立session会话 //里面会有两个参数,第一个为事物,第二个是签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、这里接受的queue的名称要和发送者的一致 Topic topic = session.createTopic(TOPIC_NAME); //五、持久化的订阅者,第一个参数为订阅主题名称,第二个为备注 TopicSubscriber topicSubscriber=session.createDurableSubscriber(topic,"remakr"); //六、启动链接 connection.start(); //receive等待消息,不限制时间 Message message = topicSubscriber.receive(); while (null!=message){ TextMessage textMessage=(TextMessage)message; System.out.println("******------>"+textMessage.getText()); //等待五秒钟 message=topicSubscriber.receive(5000L); } session.close(); connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
主题持久化注意事项
一、若是消费者注册关闭后,消息提供者在消费者关闭后在发送消息,消费者再打开,会收到消息提供者发送的消息。
二、若是在消费者没有注册以前,消息提供者发送消息,消费者不会收到以前的消息
总结:
一、必定要先运行一次消费者,等同于向MQ注册,相似我订阅了这个主题
二、而后在运行消费者发送消息
三、不管消费者是否在线,都会收到消息,不在线的话,下次链接时会把没有收到过的消息所有接受。