RabbitMQ实战二(消峰限流补充)

Hello,我是一名在互联网捡破烂的程序员,最近破烂还挺好捡的,天天都是东逛逛西逛逛,收了不少的破烂呢。。。java

收废铁了,十块一斤。快拿来卖哦,什么烂电冰箱,烂电视机,无论什么破烂我都要。。。程序员

天天骑着个人烂三轮车,天天都是活的苟且偷生的,我好可怜。。。web

呜呜呜呜redis

无论有钱木钱,都进来看一看瞧一瞧哦。。spring

好了~~~数据库

今天咱们来接着讲,若是你是直接来阅读的这一期的话,那你要去看上一期的内容哦,这样咱们才能够衔接起来的。apache

限流操做

咱们就直接从Controller层开始讲解了哈json

1. 修改咱们的OrderController
OrderController
package com.example.rabbitmq.controller;
​
import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestOrder;
import com.example.rabbitmq.service.OrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
​
​
/**
 * 订单Controller
 */
@RestController
@RequestMapping("order")
public class OrderController {
​
​
 @Autowired(required = false)
 private OrderService orderService;
​
 private static Integer count = 0;
​
 private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
​
 /**
 * 使用RabbitMQ限流建立订单
 * @return
 */
 @PostMapping("create/{goodsId}")
 public ApiResponse create(@PathVariable("goodsId") Long goodsId){
​
 ApiResponse apiResponse = this.orderService.create(goodsId);
​
 LOGGER.info("流量请求:" + count++);
​
 return apiResponse;
 }
​
 /**
 * 无RabbitMQ建立订单
 * @return
 */
 @PostMapping("/save/{goodsId}")
 public ApiResponse save(@PathVariable("goodsId") Long goodsId){
​
 ApiResponse apiResponse = this.orderService.save(goodsId);
​
 LOGGER.info("流量请求:" + count++);
​
 return apiResponse;
 }
}

其中咱们只添加了一个使用限流操做的接口,和普通的接口同样同样的windows

2. 修改咱们的订单Service
OrderService
package com.example.rabbitmq.service;
​
import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestOrder;
​
import java.util.List;
​
public interface OrderService {
​
 /**
 * 使用RabbitMQ限流建立订单
 * @return
 */
 ApiResponse create(Long goodsId);
​
 /**
 * 无RabbitMQ消峰限流
 * @return
 */
 ApiResponse save(Long goodsId);
​
 /**
 * 建立订单
 * @param testOrder
 */
 void createOrder(TestOrder testOrder);
}
​

其中的create方法主要是将咱们的请求所有接到消息队列中api

真正建立订单的方法是createOrder

3. 修改咱们的订单ServiceImpl
OrderServiceImpl
package com.example.rabbitmq.service.impl;
​
import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestGoods;
import com.example.rabbitmq.entity.TestOrder;
import com.example.rabbitmq.mapper.GoodsMapper;
import com.example.rabbitmq.mapper.OrderMapper;
import com.example.rabbitmq.service.OrderService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
​
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.UUID;
​
/**
 * OrderService
 */
@Service
public class OrderServiceImpl implements OrderService {
​
 @Autowired(required = false)
 private AmqpTemplate amqpTemplate;
​
 @Autowired(required = false)
 private OrderMapper orderMapper;
​
 @Autowired(required = false)
 private GoodsMapper goodsMapper;
​
 /**
 * 使用RabbitMQ限流建立订单
 *
 * @return
 */
 @Override
 public ApiResponse create(Long goodsId) {
​
 try {
​
 // 判断参数
 if (goodsId == null){
​
 return new ApiResponse().code(444).msg("参数错误");
 }
​
 // 发送消息
 this.amqpTemplate.convertAndSend("order.create",goodsId);
​
 return new ApiResponse().code(200).msg("下单中,请稍后");
​
 }catch (Exception e){
​
 return new ApiResponse().code(500).msg("服务器错误");
 }
​
 }
​
​
 /**
 * 使用RabbitMQ限流建立订单
 *
 * @param testOrder
 * @return
 */
 @Override
 public void createOrder(TestOrder testOrder) {
​
 this.orderMapper.create(testOrder);
 }
​
 /**
 * 无RabbitMQ消峰限流
 *
 * @return
 */
 @Override
 public ApiResponse save(Long goodsId) {
​
 if (goodsId == null){
​
 return new ApiResponse().code(400).msg("参数错误");
 }
​
 // 根据商品Id查询商品
 TestGoods testGoods = this.goodsMapper.selectStockById(goodsId);
​
 // 商品不存在或者商品库存为0
 if (testGoods == null || testGoods.getGoodsStock() <= 0){
​
 return new ApiResponse().code(400).msg("商品不存在或者库存为0");
 }
​
 // 直接添加
 // 建立订单
 TestOrder testOrder = new TestOrder();
​
 // 设置参数
 testOrder.setOrderUserEmail("111@qq.com");
​
 testOrder.setOrderUserName("FC");
​
 testOrder.setOrderDate(new Date());
​
 this.orderMapper.save(testOrder);
​
 // 更新库存
 this.goodsMapper.updateGoodsStock(goodsId);
​
 return new ApiResponse().code(200).msg("订单建立成功");
 }
}

在这里咱们能够清楚的看到了,咱们是怎么将请求所有接入到咱们的消息队列中的

咱们这样作的思想就是,咱们须要有一个中间商来帮助咱们接收消息,那么这个中间商要比咱们的持久层要厉害些,能够接收不少的请求,咱们再慢慢的消费这些消息

4. 消费者
OrderListener
package com.example.rabbitmq.listener;
​
import com.example.rabbitmq.entity.TestGoods;
import com.example.rabbitmq.entity.TestOrder;
import com.example.rabbitmq.service.GoodsService;
import com.example.rabbitmq.service.OrderService;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.util.Date;
import java.util.List;
​
/**
 * 订单请求消息生产者
 */
@Component
public class OrderListener {
​
 private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);
​
 @Autowired(required = false)
 private OrderService orderService;
​
 @Autowired
 private AmqpTemplate amqpTemplate;
​
 @Autowired(required = false)
 private GoodsService goodsService;
​
 /**
 * 建立订单消息监听
 * @param goodsId
 */
 @RabbitListener(bindings = @QueueBinding(
 value = @Queue(value = "ORDER.QUEUE", durable = "true"),
​
 arguments = {@Argument(name = "x-max-length", value = "10"),
 @Argument(name = "dead-letter-exchange",value = "reject-publish")
 },
​
 exchange = @Exchange(value = "ORDER.EXCHANGE", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),
​
 key = {"order.create"}
 ))
 public void create(Long goodsId, Channel channel, Message message) throws IOException {
​
 try {
​
 if (goodsId == null) {
​
 return;
 }
​
 // 先根据商品Id查询商品库存
 TestGoods goods = this.goodsService.selectGoodsById(goodsId);
​
 if (goods == null || goods.getGoodsStock() <= 0){
​
 return;
​
 }
​
 // 建立订单
 TestOrder testOrder = new TestOrder();
​
 // 设置参数
 testOrder.setOrderUserName("FC");
​
 testOrder.setOrderUserEmail("111@qq.com");
​
 testOrder.setOrderDate(new Date());
​
 // 执行添加
 this.orderService.createOrder(testOrder);
​
 // 更新库存
 this.goodsService.updateGoodsStock(goodsId);
​
 LOGGER.info("消费成功");
​
 } catch (Exception e) {
​
 LOGGER.error("消费失败");
​
 }finally {
​
 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 }
 }
}

这个就不用我多说了吧。这里面的细枝末节,须要自行查阅。

也能够去参考 RabbitMQ实践应用一

这样就能够简单的实现咱们的消峰限流啦。。。。

咱们开始咱们的测试吧

咱们首先仍是用postman来测试咱们的接口是否可用

咱们能够在控制台看见

这样咱们就简单的消费成功了,库存和订单也都更新和添加成功啦

固然这样咱们也是不可能的,那咱们仍是要去用压力测试来试试能不能顶住呢

仍是用咱们的jmeter来作压力测试,只须要修改接口就行

这样看咱们的就是有条理的执行啦

可是这样咱们的请求就没有所有打在数据库上,这样咱们就能够实现限流啦。。。

咱们再来看咱们的数据库呢?有木有像没有限流的出现超卖的状况呀

哟哟哟,尽然没有出现超卖的状况,那这就算是实现了限流操做

咱们再来看订单是否是100个呢

咱们是从115开始的,看看是否是214结束呢

哇哦,果真是预想的同样。。。。。

咱们再看时间,是在同一时间下的订单。。。

那就证实了咱们的限流操做

OK,这里咱们的消峰限流就所有完成了,可能不是那么完善,也有不少的漏洞。

就算库存为0,流量仍是会分批打到咱们的数据上面,有木有办法,直接舍弃这些流量呢?

这就须要本身思考了,要化为本身的东西才算是真正理解。。。。

总结

从某种意义上说,消费者的限流策略有助于那么处理消息效率高的消费者多消费一些消息,效率低一些的消费者少推送一些消息,从而能够达到能者多劳的目的,尽量发挥消费者处理消息的能力。在项目中,为了缓解生产者和消费者两边效率不平衡的影响,一般会对消费者进行限流处理,保证消费者端正常消费消息,尽量避免服务器崩溃以及宕机现象。

加入Redis缓存实现限流操做

1、为何要加入Redis缓存

咱们在前面实现了简单的限流操做,对用户下订单有了很好的维护及在并发状况下能够撑住。

可是,咱们在前面留了一个问题,当咱们库存为0时,可是咱们的流量还存在不少,虽然是分批打入咱们的数据库,这样对咱们来讲是很不友好的。数据库中根本没有库存了,就不该该将剩余的流量打入到咱们的数据层。

那么我咱们应该怎么解决呢?

这里咱们就加入了 Redis缓存技术,相信你在使用这个技术以前就很了解这个Redis了吧。。。。

2、流程图

流程图不标准,根据本身的理解画的。。。

将就一下吧。。。。

3、代码实现

咱们只须要修改部分的代码

3.1 引入新的依赖

相信你猜就知道了吧

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
3.2 修改咱们的application.yml
#redis配置
 redis:
 host: 192.168.2.4 #主机地址
 database: 4

使用前须要安装Redis,咱们能够在Linux上安装也能够在windows上安装

推荐在Linux上安装,咱们的大部分都是部署在Linux上的

Redis的使用操做

3.3 修改咱们的OrderListener
package com.example.rabbitmq.listener;
​
import com.example.rabbitmq.entity.TestGoods;
import com.example.rabbitmq.entity.TestOrder;
​
import com.example.rabbitmq.service.GoodsService;
import com.example.rabbitmq.service.OrderService;
import com.example.rabbitmq.utils.JsonUtils;
import com.rabbitmq.client.Channel;
​
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.locks.ReentrantLock;
​
​
/**
 * 订单请求消息生产者
 */
@Component
public class OrderListener {
​
 private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);
​
 @Autowired(required = false)
 private OrderService orderService;
​
 @Autowired(required = false)
 private GoodsService goodsService;
​
 private static String GOODS_NAME = "goods:id:";
​
 @Autowired(required = false)
 private StringRedisTemplate stringRedisTemplate;
​
 private static ReentrantLock lock = new ReentrantLock();
​
 /**
 * 建立订单消息监听
 *
 * @param goodsId
 */
 @RabbitListener(bindings = @QueueBinding(
 value = @Queue(value = "ORDER.QUEUE", durable = "true"),
​
 arguments = {@Argument(name = "x-max-length", value = "10"),
 @Argument(name = "dead-letter-exchange", value = "reject-publish")
 },
​
 exchange = @Exchange(value = "ORDER.EXCHANGE", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),
​
 key = {"order.create"}
 ))
 public void create(Long goodsId, Channel channel, Message message) throws IOException {
​
 try {
​
 if (goodsId == null) {
​
 return;
 }
​
 String json = this.getRedisData(goodsId);
​
 // Redis缓存中没有命中
 if (StringUtils.isBlank(json)){
​
 /**
 * 上锁
 */
 if (lock.tryLock()){
​
 // 从数据库库中拿到数据
 TestGoods testGoods = this.getDBData(goodsId);
​
 if (testGoods == null || testGoods.getGoodsStock() <= 0){
​
 return;
 }
​
 // 建立订单
 TestOrder testOrder = new TestOrder();
​
 // 设置参数
 testOrder.setOrderUserName("FC");
​
 testOrder.setOrderUserEmail("111@qq.com");
​
 testOrder.setOrderDate(new Date());
​
 // 执行添加
 this.orderService.createOrder(testOrder);
​
 // 更新库存
 testGoods.setGoodsStock(testGoods.getGoodsStock() - 1);
​
 json = JsonUtils.serialize(testGoods);
​
 this.setRedisDate(goodsId, json);
​
 // 释放锁
 lock.unlock();
 }
 } else {
​
 Thread.sleep(100L);
​
 json = this.getRedisData(goodsId);
​
 // 将Json转化为对象
 TestGoods testGoods = JsonUtils.parse(json, TestGoods.class);
​
 if (testGoods == null || testGoods.getGoodsStock() <= 0){
​
 return;
 }
​
 // 建立订单
 TestOrder testOrder = new TestOrder();
​
 // 设置参数
 testOrder.setOrderUserName("FC");
​
 testOrder.setOrderUserEmail("111@qq.com");
​
 testOrder.setOrderDate(new Date());
​
 // 执行添加
 this.orderService.createOrder(testOrder);
​
 // 更新库存
 testGoods.setGoodsStock(testGoods.getGoodsStock() - 1);
​
 json = JsonUtils.serialize(testGoods);
​
 this.setRedisDate(goodsId, json);
 }
​
 } catch (Exception e) {
​
 LOGGER.error("消费失败");
​
 } finally {
​
 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 }
 }
​
 /**
 * 从Redis中获取数据
 *
 * @param goodsId
 */
 private String getRedisData(Long goodsId) {
​
 // 从缓存中获取数据
 String json = this.stringRedisTemplate.opsForValue().get(GOODS_NAME + goodsId);
​
 return json;
 }
​
 /**
 * 设置Redis缓存数据
 * @param goodsId
 * @param json
 */
 private void setRedisDate(Long goodsId, String json){
​
 this.stringRedisTemplate.opsForValue().set(GOODS_NAME + goodsId, json);
 }
​
 /**
 * 从数据库中取出数据
 * @param goodsId
 * @return
 */
 private TestGoods getDBData(Long goodsId){
​
 TestGoods testGoods = this.goodsService.selectGoodsById(goodsId);
​
 return testGoods;
 }
}

主要代码解释

其中写的三个方法就不用说了吧

getRedisData:从Redis中获取数据

setRedisData:更新Redis中的数据

getDBData:从数据库中获取数据

  • 首先请求过来会带着一个Id,先判断这个Id是否为空
if (goodsId == null) {
 return;
 }

若为空,消息直接丢弃

  • 先从 Redis中获取数据
String json = this.getRedisData(goodsId);
  • 判断获取的数据是否为空,也就是缓存中是否有数据,若没有数据,则上锁,从数据库中取出数据
/**
 * 上锁
 */
  if (lock.tryLock()){
 // 从数据库库中拿到数据
 TestGoods testGoods = this.getDBData(goodsId);
  • 从数据库中拿到数据后,将其放入缓存中,并释放锁
json = JsonUtils.serialize(testGoods);

this.setRedisDate(goodsId, json);

// 释放锁
lock.unlock();
  • 若从缓存中命中数据,那么让其休眠
Thread.sleep(100L);

json = this.getRedisData(goodsId);

OK,就这样简单的修改好咱们的代码了,是否是很简单呢。

4、截图验证

首先来看看咱们的数据库中商品的库存信息

再来看看咱们的订单表

接下来,就直接上咱们的压力测试,此次要使用6000个请求啦,看你还抵不抵得住

首先看咱们的控制台

咱们的消息队列直接将所有的瞬时流量收入囊中

而后咱们在分批处理咱们的流量,也就是下订单啦

OK,咱们来看看咱们的Redis缓存是否正确呢?预判库存为0

哇哦,那还能够,基本实现了

下面才是咱们的重头戏

看看咱们的数据库是否有错呢?

先来看库存,预判为100

为何呢?咱们在下单中,基本上就没有和数据库打交道,把它给撇开了

咱们直接是在缓存中进行的,根本没有更新数据库的商品库存,那确定是100啦

哇哦,果然是诶。。。。

那再来看咱们订单表呢,你有木有超出100条呢

一张图截不完,咱们是从715开始的,看看是不是814结束呢

哇哦,也是诶,那这就对了嘛。。。

咱们仔细一看,这一次咱们下单的时间就有差异了,没有同一时间将下单。。。

5、总结

咱们使用Redis缓存,大大减轻了咱们数据库的压力,查询商品只须要访问一次数据库,查询的数据放入缓存。

订单的下单时间也有所优化。

大家有木有什么优化的呢?能够思考哦,其实还有不少方案

相关文章
相关标签/搜索