package com.shi.page; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTextMessage; import org.junit.Test; /** * * @author: SHF * @date: 2018年3月16日 上午8:48:10 * @Description:消息队列测试类 */ public class ActiveMQTest { /** * 点到点形式 发送 消息 生产者 * @throws Exception */ @Test public void queueProducerTest()throws Exception{ //1.建立一个链接工厂对象,须要指定服务的ip和端口 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616"); //2.使用工厂对象建立一个Connection对象 Connection connection = connectionFactory.createConnection(); //3.开启链接,调用Connection对象的start方法 connection.start(); //4.建立一个Session对象 //第一个参数:是否开启事物。若是开启事物第二个参数无心义。通常不开启事物。 //第二个参数:应答模式,通常:自动应答,手动应答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.使用Session对象建立一个Destination对象,俩种形式queue,topic,如今使用queue Queue queue = session.createQueue("test-queue");//Queue extends Destination //6.使用Session对象建立一个producer对象 MessageProducer producer = session.createProducer(queue); //7.建立一个Message对像,能够使用TextMessage /*TextMessage textMessage=new ActiveMQTextMessage(); textMessage.setText("你要发送的消息");*/ TextMessage textMessage = session.createTextMessage("queue你要发送的消息"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close(); } /** * 点到点接受消息 消费者 * @throws Exception */ @Test public void queueConsumerTest()throws Exception{ //1 建立一个ConnectionFactory对象链接MQ服务器 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616"); //2 建立一个链接对象 Connection connection = connectionFactory.createConnection(); //3 开启链接 connection.start(); //4 使用Connection对象建立一个Session对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5 建立一个Destination对象 queue对象 Queue queue = session.createQueue("test-queue"); //6 使用Session对象建立一个消费者对象 MessageConsumer consumer = session.createConsumer(queue); //7 接受消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message paramMessage) { // 接受到消息的回调函数 TextMessage testMessage=(TextMessage) paramMessage; try { //8 打印消息 System.out.println(testMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); System.in.read();//等待接受消息 //9 关闭链接 consumer.close(); session.close(); connection.close(); } /** * 一对多 发送消息 生产者 * @throws Exception */ @Test public void topicProducerTest()throws Exception{ //1.建立一个链接工厂对象,须要指定服务的ip和端口 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616"); //2.使用工厂对象建立一个Connection对象 Connection connection = connectionFactory.createConnection(); //3.开启链接,调用Connection对象的start方法 connection.start(); //4.建立一个Session对象 //第一个参数:是否开启事物。若是开启事物第二个参数无心义。通常不开启事物。 //第二个参数:应答模式,通常:自动应答,手动应答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.使用Session对象建立一个Destination对象,俩种形式queue,topic,如今使用topic Topic topic = session.createTopic("test-topic");//Queue extends Destination //6.使用Session对象建立一个producer对象 MessageProducer producer = session.createProducer(topic); //7.建立一个Message对像,能够使用TextMessage /*TextMessage textMessage=new ActiveMQTextMessage(); textMessage.setText("你要发送的消息");*/ TextMessage textMessage = session.createTextMessage("topic你要发送的消息"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close(); } /** * 一对多接受消息 消费者 * @throws Exception */ @Test public void topicConsumerTest()throws Exception{ //1 建立一个ConnectionFactory对象链接MQ服务器 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616"); //2 建立一个链接对象 Connection connection = connectionFactory.createConnection(); //3 开启链接 connection.start(); //4 使用Connection对象建立一个Session对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5 建立一个Destination对象 topic对象 Topic topic = session.createTopic("test-topic"); //6 使用Session对象建立一个消费者对象 MessageConsumer consumer = session.createConsumer(topic); //7 接受消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message paramMessage) { // 接受到消息的回调函数 TextMessage testMessage=(TextMessage) paramMessage; try { //8 打印消息 System.out.println(testMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic消费者3 已经启动..."); System.in.read();//等待接受消息 //9 关闭链接 consumer.close(); session.close(); connection.close(); } }