RabbitMQ是实现了高级消息队列协议(Advanced Message Queueing Protocol , AMQP)的开源消息代理软件(亦称面向消息的中间件)。java
一、AMQP协议spring
RocketMQ基于JMS(Java Messaging Service , java 消息服务)协议。安全
重点:协议的做用:二者的交互基础。springboot
三个主要的功能模块:dom
1.一、“exchange”接收发布应用程序发送的消息,并根据必定的规则将这些消息路由到“消息队列”。(交换器类型:direct, fanout, topic, header)spring-boot
1.二、“message queue”存储消息,直到这些消息被消费者安全处理完为止。ui
1.三、“binding”定义了exchange和message queue之间的关联,提供路由规则。spa
AMQP协议是一个二进制协议。代理
协议模型图:code
二、RabbitMQ
2.一、交换器类型:
2.1.一、direct (默认):每一个队列都会使用它的队列名字做为路由关键字(routing key)去自动地绑定到默认交换器上。
2.1.二、fanout : 该类型的交换器会将消息转发给全部与之绑定的队列上。(相似广播机制)
2.1.三、topic : 该类型的交换器会视消息路由关键字和绑定路由关键字之间的匹配状况,进行消息的路由转发。解析:一个消息过来后,将根据路由key转发给符合要求的全部队列。
2.1.四、headers : 用于路由的属性是取自于消息header属性的,当消息header的值与队列绑定时指定的值相同时,消息就会路由至相应的队列中。
2.二、虚拟机(virtual hosts)
AMQP使用了虚拟机的概念,在一个broker上面划分出多个隔离的环境(各环境下的用户、交换器以及队列等互不影响)。这样一来,AMQP客户端们在进行链接的时候,须要协商指定同一个vhost才能进行正常的往来业务。
三、spring-boot-starter-amqp 的使用
一、BeanConfig
@Configuration public class AmqpConfig { @Resource private RabbitTemplate rabbitTemplate; @Bean public AmqpTemplate amqpTemplate() { Logger log = LoggerFactory.getLogger(RabbitTemplate.class); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setEncoding("UTF-8"); rabbitTemplate.setMandatory(true); rabbitTemplate.setUsePublisherConnection(true); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息发送到exchange成功,id: {}", correlationData.getId()); } else { log.info("消息发送到exchange失败,缘由: {}", cause); } }); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationId(); log.info("消息:{} 发送失败, 应答码:{} 缘由:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); }); return rabbitTemplate; } /** * 修改好友备注交换机 */ @Bean(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_DIRECT_EXCHANGE) public Exchange modifyFriendRemakeNameDirectExchange() { return ExchangeBuilder .directExchange(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_DIRECT_EXCHANGE) .durable(true) .build(); } /** * 队列 */ @Bean(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_QUEUE) public Queue modifyFriendRemakeNameQueue() { return QueueBuilder.durable(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_QUEUE).build(); } /** * 队列 关联 路由key 关联 交换机 * @return */ @Bean public Binding modifyFriendRemakeNameBinding(Queue modifyFriendRemakeNameQueue, Exchange modifyFriendRemakeNameDirectExchange) { return BindingBuilder .bind(modifyFriendRemakeNameQueue) .to(modifyFriendRemakeNameDirectExchange) .with(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_ROUTING_KEY) .noargs(); } }
二、AmqpConstant
public interface AmqpConstant { /** * 修改好友备注交换机 */ String MODIFY_FRIEND_REMAKE_NAME_DIRECT_EXCHANGE = "modifyFriendRemakeNameDirectExchange"; /** * 修改好友备注队列 */ String MODIFY_FRIEND_REMAKE_NAME_QUEUE = "modifyFriendRemakeNameQueue"; /** * 修改好友备注路由键 */ String MODIFY_FRIEND_REMAKE_NAME_ROUTING_KEY = "modifyFriendRemakeNameRoutingKey"; }
三、RabbitUtils
public class RabbitUtils { private static final Logger logger = LoggerFactory.getLogger(RabbitUtils.class); public static void sendModifyRemakeNameMsg(RabbitTemplate rabbitTemplate, RabbitFriend rabbitFriend){ try{ if(rabbitFriend.getLists().size() < 1){ return; } logger.info("sendModifyRemakeNameMsg request:"+JSON.toJSONString(rabbitFriend)); CorrelationData correlationDataId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend( AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_DIRECT_EXCHANGE, AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_ROUTING_KEY, rabbitFriend, correlationDataId ); }catch (Exception e){ logger.info("mq消息发送错误",e); } } }
四、生产者
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitTest { @Autowired RabbitTemplate rabbitTemplate; @Test public void test1(){ RabbitFriend rabbitFriend = new RabbitFriend(); rabbitFriend.setUserId(100000022); rabbitFriend.setLists(new ArrayList<>()); rabbitFriend.getLists().add(new RabbitNickName("2637845647931","2637845647931123哈哈")); rabbitFriend.getLists().add(new RabbitNickName("2637","2637哈哈")); RabbitUtils.sendModifyRemakeNameMsg(rabbitTemplate,rabbitFriend); try { int read = System.in.read(); } catch (IOException e) { e.printStackTrace(); } System.out.println("end!"); } }
五、消费者
@Component public class RemakeNameMessageListener { private static final Logger log = LoggerFactory.getLogger(RemakeNameMessageListener.class); @RabbitListener(queues = AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_QUEUE) public void remakeNameQueueListener(Message message, Channel channel) throws IOException { try { RabbitFriend rabbitFriend = RabbitUtils.buildMessage(message, RabbitFriend.class); log.info("RemakeNameMessageListener|remakeNameQueueListener,correlationDataId:{},RabbitFriend:{}", message.getMessageProperties().getCorrelationId(), rabbitFriend); //do you things log.info("RemakeNameMessageListener|remakeNameQueueListener,Consumption of success"); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); } catch (Exception e) { log.error("RemakeNameMessageListener|remakeNameQueueListener,Consumption of failed,cause:{}", e.getMessage()); channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true); } } }
六、springboot 配置文件
spring: rabbitmq: host: 192.168.4.21 port: 5672 username: sasa password: sasa publisher-confirms: true #手动确认 publisher-returns: true virtualHost: / #虚拟机路径 template: mandatory: true retry: enabled: true #重试 multiplier: 2 #重试次数