activeMQ的request-response请求响应模式

一:为何须要请求响应模式java

  在消息中间中,生产者只负责生产消息,而消费者只负责消费消息,二者并没有直接的关联。可是若是生产者想要知道消费者有没有消费完,或者用不用从新发送的时候,这时就要用到请求响应模式。spring

  应用场景:session

  1:主要肯定mq有没有正确的消费消息。  ide

  2:当某一个业务发送mq,可是须要返回结果,这时候就要用到请求响应模式。应用的场景不是不少。spring-boot

二:具体的代码操做测试

 第一种:activeMQ的spring代码ui

  在生产者的xml配置文件中,加上一个监听的。spa

<jms:listener-container destination-type="queue" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="tempqueue" ref="getResponse"></jms:listener>
</jms:listener-container>

 2:生产的java代码
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message message1 = session.createTextMessage(message);
//发送的时候,告诉消费者应答消息发送到那里
Destination destination = session.createTemporaryQueue();
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(getResponse);
message1.setJMSReplyTo(destination);

String uid = System.currentTimeMillis()+"";
message1.setJMSCorrelationID(uid);

return message1;
}
});
3:生产者建立一个消费
@Component public class GetResponse implements MessageListener { public void onMessage(Message message) { try { System.out.println("GetResponse accept msg :"+((TextMessage)message).getText()); }catch (Exception e){ e.printStackTrace(); } } }
View Code

 

4:修改消费端代码code

try {
System.out.println("QueueReceiver1 accept msg : "+((TextMessage)message).getText());
//业务工做
reploy.send(((TextMessage)message).getText(),message);  //消费完成之后发送一个消息,告诉生产者已经消费成功。
}catch (Exception e){
e.printStackTrace();
}
5:建立消费者发送消息
public void send(final String consumerMsg, Message produceMessage) throws Exception{
jmsTemplate.send(produceMessage.getJMSReplyTo(), new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message msg = session.createTextMessage("ReplyTo "+consumerMsg);
return msg;
}
});
}

以上就是在spring当中使用,请求-应答模式。



第二种:在spring-boot当中使用请求-应答
1:mq的发送者
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service; import javax.jms.Destination; /** * mq发送者 */ @Service public class ActivePro { @Autowired private JmsMessagingTemplate jmsTemplate; public void sendMessage(Destination destination, String message){ jmsTemplate.convertAndSend(destination,message); } @JmsListener(destination = "boot.reploy") public void receiveQueue(String text){ System.out.println(text); } }
View Code

 2:mq的消费者xml

import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; /** * mq的消费者 */ @Component public class ActiveCon { @JmsListener(destination = "boot1.queue") @SendTo("boot.reploy") public String receiveQueue(String text){ System.out.println(text); return "I am tom"; } }
View Code

 3:测试的

 @Test public void contextLoads2() { try { Destination destination =
                    new ActiveMQQueue("boot1.queue"); produce.sendMessage(destination,"aaaaa"); System.in.read(); }catch (Exception e){ e.printStackTrace(); } }
View Code