Remote Procedure Call:远程过程调用,一次远程过程调用的流程即客户端发送一个请求到服务端,服务端根据请求信息进行处理后返回响应信息,客户端收到响应信息后结束。bash
发送消息后会收到回复dom
下文使用的队列和交换器在SpringBoot中使用RabbitMQ(二)已有声明异步
@Component
public class SyncSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
String content = "I am sync msg!";
System.out.println("########### send : " + content);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
Object response = rabbitTemplate.convertSendAndReceive("directExchange", "info-msg", content, correlationData);
System.out.println("########### response : " + response);
}
}
复制代码
@Component
@RabbitListener(queues = "infoMsgQueue")
public class SyncReceiver {
@RabbitHandler
public String process(String message) throws InterruptedException {
System.out.println("########### SyncReceiver Receive :" + message);
Thread.sleep(1000*3);
return "copy";
}
}
复制代码
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class SyncTest {
@Autowired
private SyncSender syncSender;
@Test
public void send() {
for (int i=0 ; i< 10;i++){
syncSender.send();
}
}
}
结果:接收到回复后才发送下一条消息
########### send : I am sync msg!
########### SyncReceiver Receive :I am sync msg!
########### response : copy
########### send : I am sync msg!
########### SyncReceiver Receive :I am sync msg!
########### response : copy
########### send : I am sync msg!
########### SyncReceiver Receive :I am sync msg!
########### response : copy
复制代码
rabbitTemplate 有个超时时间,默认5秒。5秒内生产者收不到回复会抛出异常,能够同步rabbitTemplate.setReplyTimeout()来设置。async
异步中使用AsyncRabbitTemplateide
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){
return new AsyncRabbitTemplate(rabbitTemplate);
}
复制代码
@Component
public class AsyncSender {
@Autowired
private AsyncRabbitTemplate rabbitTemplate;
public void send(){
String content = "I am async msg!";
System.out.println("########### send : " + content);
AsyncRabbitTemplate.RabbitConverterFuture<Object> future = rabbitTemplate.convertSendAndReceive("directExchange", "warn-msg", content);
future.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onFailure(Throwable throwable) {
}
@Override
public void onSuccess(Object o) {
System.out.println("aaa : " + o);
}
});
}
}
复制代码
@RabbitListener(queues = "warnMsgQueue")
@Component
public class AsyncReceiver {
@Autowired
RabbitTemplate rabbitTemplate;
@RabbitHandler
public String process(String message) throws InterruptedException {
System.out.println("########### SyncReceiver Receive :" + message);
Thread.sleep(1000*1);
return "hao";
}
}
复制代码
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class AsyncTest {
@Autowired
private AsyncSender asyncSender;
@Test
public void send() {
for (int i=0 ; i< 10;i++){
asyncSender.send();
}
}
}
结果:能够多条消息不用等待回复才发送下一条
########### send : I am async msg!
########### send : I am async msg!
########### send : I am async msg!
########### send : I am async msg!
########### SyncReceiver Receive :I am async msg!
aaa : hao
########### SyncReceiver Receive :I am async msg!
aaa : hao
########### SyncReceiver Receive :I am async msg!
aaa : hao
########### SyncReceiver Receive :I am async msg!
aaa : hao
复制代码