咱们如今有两个主要的系统一个是活动系统一个是奖品系统,活动系统会调用奖品系统发放奖励。git
最开始两个之间只经过http直接调用,优势:开发成本低,没有多余组件引入;发放奖励实时返回;活动系统不须要管奖品是否还有剩余库存。缺点:这样就致使上游活动系统强依赖于下游的奖品系统,若是一旦奖品系统挂掉,咱们活动系统也就不可用了;这里还有个bug,在调用奖品系统发放奖励,奖品系统发放成功了,可是活动系统请求超时了,就会致使提示客户的没有奖品了,可是实际奖品又发放了。github
访问量上来后发直接走http确定是不行的,因此引入了MQ将将两个系统隔离开,优势:全部发放流程异步执行,活动系统响应更快了;这两个系统就变成弱引用关系,即便奖品系统挂掉,活动系统仍能正常运行;不会出现上面说的bug了;缺点:发放奖励将会有延迟;引入MQ增长了项目复杂度,咱们必须去考虑消息的丢失,重复消费等问题;活动系统须要知道奖品的库存状况。redis
针对上面使用MQ发放奖励会遇到的问题,咱们能够经过面的方案来解决。spring
在数据库建立一张异常消息表。数据库
这样子虽然仍是不能彻底杜绝消息丢失,可是绝大部分状况下是没有问题的。dom
为每一个消息生成业务流水号,将流水号和发放里的参数一块儿发送到奖品系统,奖品系统在发放奖励的时候先判断这个流水号是否存在,存在就表示该奖品已经发过来直接返回发放成功,若是没有就进行发放奖励操做。异步
咱们在配置活动的时候会将奖品的库存放到咱们活动系统,在发MQ消息以前回去判断是否有剩余库存,若是没有直接返回奖励领完了,若是有才回去发MQ消息。扣减库存能够参考基于redis实现的扣减库存。ide
下面是引入MQ事后咱们系统的流程图 spring-boot
/** * Rabbit 发送消息 * * @author yuhao.wang */ @Service public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, InitializingBean { private final Logger logger = LoggerFactory.getLogger(RabbitSender.class); /** * Rabbit MQ 客户端 */ @Autowired private RabbitTemplate rabbitTemplate; /** * 系统配置 */ @Autowired private SystemConfig systemConfig; /** * 发送MQ消息 * * @param exchangeName 交换机名称 * @param routingKey 路由名称 * @param message 发送消息体 */ public void sendMessage(String exchangeName, String routingKey, Object message) { Assert.notNull(message, "message 消息体不能为NULL"); Assert.notNull(exchangeName, "exchangeName 不能为NULL"); Assert.notNull(routingKey, "routingKey 不能为NULL"); // 获取CorrelationData对象 CorrelationData correlationData = this.correlationData(message); correlationData.setExchange(exchangeName); correlationData.setRoutingKey(routingKey); correlationData.setMessage(message); logger.info("发送MQ消息,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}", correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey); // 发送消息 this.convertAndSend(exchangeName, routingKey, message, correlationData); } /** * 用于实现消息发送到RabbitMQ交换器后接收ack回调。 * 若是消息发送确认失败就进行重试。 * * @param correlationData * @param ack * @param cause */ @Override public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) { // 消息回调确认失败处理 if (!ack && correlationData instanceof CorrelationData) { CorrelationData correlationDataExtends = (CorrelationData) correlationData; //消息发送失败,就进行重试,重试事后还不能成功就记录到数据库 if (correlationDataExtends.getRetryCount() < systemConfig.getMqRetryCount()) { logger.info("MQ消息发送失败,消息重发,消息ID:{},重发次数:{},消息体:{}", correlationDataExtends.getId(), correlationDataExtends.getRetryCount(), JSON.toJSONString(correlationDataExtends.getMessage())); // 将重试次数加一 correlationDataExtends.setRetryCount(correlationDataExtends.getRetryCount() + 1); // 重发发消息 this.convertAndSend(correlationDataExtends.getExchange(), correlationDataExtends.getRoutingKey(), correlationDataExtends.getMessage(), correlationDataExtends); } else { //消息重试发送失败,将消息放到数据库等待补发 logger.warn("MQ消息重发失败,消息入库,消息ID:{},消息体:{}", correlationData.getId(), JSON.toJSONString(correlationDataExtends.getMessage())); // TODO 保存消息到数据库 } } else { logger.info("消息发送成功,消息ID:{}", correlationData.getId()); } } /** * 用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。 * 基本上来讲线上不可能出现这种状况,除非手动将已经存在的队列删掉,不然在测试阶段确定能测试出来。 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.error("MQ消息发送失败,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息体:{}", replyCode, replyText, exchange, routingKey, JSON.toJSONString(message.getBody())); // TODO 保存消息到数据库 } /** * 消息相关数据(消息ID) * * @param message * @return */ private CorrelationData correlationData(Object message) { return new CorrelationData(UUID.randomUUID().toString(), message); } /** * 发送消息 * * @param exchange 交换机名称 * @param routingKey 路由key * @param message 消息内容 * @param correlationData 消息相关数据(消息ID) * @throws AmqpException */ private void convertAndSend(String exchange, String routingKey, final Object message, CorrelationData correlationData) throws AmqpException { try { rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData); } catch (Exception e) { logger.error("MQ消息发送异常,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}", correlationData.getId(), JSON.toJSONString(message), exchange, routingKey, e); // TODO 保存消息到数据库 } } @Override public void afterPropertiesSet() throws Exception { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } }
生产者端使用ConfirmCallback和ReturnCallback回调机制,最大限度的保证消息不丢失,对原有CorrelationData类进行扩展,来实现消息的重发,具体请看源码。测试
/** * 发放优惠券的MQ处理 * * @author yuhao.wang */ @Service @ConditionalOnClass({RabbitTemplate.class}) public class SendMessageListener { private final Logger logger = LoggerFactory.getLogger(SendMessageListener.class); @RabbitListener(queues = RabbitConstants.QUEUE_NAME_SEND_COUPON) public void process(SendMessage sendMessage, Channel channel, Message message) throws Exception { logger.info("[{}]处理发放优惠券奖励消息队列接收数据,消息ID:{},消息体:{}", RabbitConstants.QUEUE_NAME_SEND_COUPON, message.getMessageProperties().getCorrelationIdString(), JSON.toJSONString(sendMessage)); try { // 参数校验 Assert.notNull(sendMessage, "sendMessage 消息体不能为NULL"); // TODO 处理消息 // 确认消息已经消费成功 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { logger.error("MQ消息处理异常,消息ID:{},消息体:{}", message.getMessageProperties().getCorrelationIdString(), JSON.toJSONString(sendMessage), e); try { // TODO 保存消息到数据库 // 确认消息已经消费成功 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e1) { logger.error("保存异常MQ消息到数据库异常,放到死性队列,消息ID:{}", message.getMessageProperties().getCorrelationIdString()); // 确认消息将消息放到死信队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } } }
消费者端主要作了消息消费失败的容错处理。
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-rabbitmq 工程