实时性要求较高的解决方案java
缓存数据库双写一致性一般是用于数据实时性要求较高的场景,好比说商品库存服务。web
解决思路:redis
若是不是在读写并发高的场景下,通常采用CacheAsidePattern便可解决。即先删除缓存,再写数据库。spring
读写并发高的场景下。数据库
在读写并发高的场景下,读取和写入的操做是并发的。好比说如今数据库中的库存为100,缓存中的库存也为100。有一个写请求过来,要求修改库存为99。正常状况下是先删除了缓存,而后修改数据库中的数据为99。读请求过来的时候发现缓存中的数据为0,就会去数据库中查询获得99。而后修改缓存中的数据也为99。可是若是写请求的时候还没来得及将数据库中数据修改成99,这时度请求就过来,发现缓存中的数据为空,就去数据库中读取数据为100,而后又从新将缓存中的数据更新为100,这时写请求将数据库中的数据修改完毕,变为99。这就致使了数据不一致的产生缓存
解决:安全
将读写请求串行化。将读写请求都放到队列中操做,保证串行执行。而后再每一个队列上挂一个线程去执行队列中的请求操做并发
实时性要求不高的解决方案(先了解)app
对于实时性要求不高的数据,能够采用异步更新数据的方式。好比说商品详情页,它的数据要求不是实时性很高,可是要求大流量,特别是热点数据的读并发较高,这时候就必须有一个缓存数据生产服务。比方说有一个更新商品的服务去更新了数据库中的详情页面数据,不须要实时反应到页面上。这时候,能够将这个修改数据的请求放到消息队列中,缓存数据生产服务监听着这个消息服务,一旦接收到消息,就须要去更新本身缓存中的数据。异步
若是保证读请求和写请求是针对同一个商品?咱们须要作一个HASH路由,保证同一个商品的请求进入的是同一个内存队列。
每一个队列都对应一个工做线程,工做线程拿到对应的请求,执行对应的操做。
web容器初始化的时候,就须要初始化线程池和内存队列。咱们能够自定义一个监听器,而后注册这个监听器。
新建listener
注册listener
测试
容器初始化的流程已经作好了,如今须要实现具体的怎么去初始化线程池和内存队列。
新建线程池和内存队列的包装类ThreadPoolAndQueueWrapper
这个包装类用于放在监听器中,调用它的init()方法,就能够执行线程池的初始化和队列的初始化。线程开始提交请求工做。
package com.roncoo.eshop.inventory.thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.roncoo.eshop.inventory.request.Request;
import com.roncoo.eshop.inventory.request.RequestQueue;
/** * 初始化容器的时候须要初始化线程池和内存队列 * @author Administrator * */
public class ThreadPoolAndQueueWrapper {
private ExecutorService threadPool = Executors.newFixedThreadPool(10);
public ThreadPoolAndQueueWrapper() {
RequestQueue requestQueue = RequestQueue.getInstance();
//初始化的时候就将内存队列集合填满
for (int i = 0; i < 10; i++) {
ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(100);
requestQueue.addQueue(queue);
//线程池用于提交 请求处理的工做线程
threadPool.submit(new RequestProcessThread(queue));
}
}
/** * 初始化工做线程池和内存队列的方法,上来就执行 */
public static void init() {
//保证初始化的时候只能初始化一次线程池和内存队列
//采用静态内部类的方式保证线程绝对安全
Singleton.getInstance();
}
private static class Singleton{
private static ThreadPoolAndQueueWrapper instance;
static {
instance = new ThreadPoolAndQueueWrapper();
}
public static ThreadPoolAndQueueWrapper getInstance() {
return instance;
}
}
}
复制代码
请求队列的封装RequestQueue,内部持有请求队列的集合,提供添加队列的方法
package com.roncoo.eshop.inventory.request;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
/** * 请求内存队列封装 * @author Administrator * */
public class RequestQueue {
/** * 内存队列,是一个集合。由于涉及并发,因此使用ArrayBlockingQueeue,队列中存放的是请求(读请求和写请求) */
private List<ArrayBlockingQueue<Request>> queues = new ArrayList<ArrayBlockingQueue<Request>>();
public static RequestQueue getInstance() {
return Singleton.getInstance();
}
/** * 添加一个内存队列 * @param queue */
public void addQueue(ArrayBlockingQueue<Request> queue) {
this.queues.add(queue);
}
/** * 内部静态类的方式保证绝对的线程安全 * @author Administrator * */
private static class Singleton {
private static RequestQueue instance;
static {
instance = new RequestQueue();
}
public static RequestQueue getInstance() {
return instance;
}
}
}
复制代码
须要提交到线程池中的工做线程,用于处理Request请求。而且持有本身的内存队列
package com.roncoo.eshop.inventory.thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import com.roncoo.eshop.inventory.request.Request;
/** * 执行请求的工做线程 * @author Administrator * */
public class RequestProcessThread implements Callable<Boolean>{
/** * 本身监控的内存队列 */
private ArrayBlockingQueue<Request> queue;
public RequestProcessThread(ArrayBlockingQueue<Request> queue) {
this.queue = queue;
}
/** * 具体的工做流程 */
@Override
public Boolean call() throws Exception {
while(true) {
break;
}
return true;
}
}
复制代码
请求的封装Request,是一个接口,之后读请求和写请求须要实现这个接口,进行本身的操做逻辑
/** * 请求接口,读请求和写请求要实现这个接口 * @author Administrator * */
public interface Request {
}
复制代码
项目结构
新建实体类ProductInventory
public class ProductInventory {
/** * 商品id */
private Integer productId;
/** * 库存数量 */
private Long inventoryCnt;
public ProductInventory() {
}
public ProductInventory(Integer productId, Long inventoryCnt) {
this.productId = productId;
this.inventoryCnt = inventoryCnt;
}
public Integer getProductId() {
return productId;
}
public void setProductId(Integer productId) {
this.productId = productId;
}
public Long getInventoryCnt() {
return inventoryCnt;
}
public void setInventoryCnt(Long inventoryCnt) {
this.inventoryCnt = inventoryCnt;
}
}
复制代码
Request接口中添加业务方法
public interface Request {
void process();
}
复制代码
库存写请求 ProductInventroyWriteRequest
package com.roncoo.eshop.inventory.request;
import com.roncoo.eshop.inventory.model.ProductInventory;
import com.roncoo.eshop.inventory.service.IProductInventoryService;
/** * 库存写请求 * 写请求执行逻辑:Cache Aside Pattern * 1.先删除缓存 * 2.再更新数据库 * @author Administrator * */
public class ProductInventoryWriteRequest implements Request{
private ProductInventory productInventory;
private IProductInventoryService productInventoryService;
public ProductInventoryWriteRequest(ProductInventory productInventory, IProductInventoryService productInventoryService) {
this.productInventory = productInventory;
this.productInventoryService = productInventoryService;
}
public void process() {
//1.删除缓存
productInventoryService.removeCache(productInventory);
//2.更新数据库
productInventoryService.updateDb(productInventory);
}
}
复制代码
库存读请求 ProductInventroyReadRequest
package com.roncoo.eshop.inventory.request;
import org.springframework.beans.factory.annotation.Autowired;
import com.roncoo.eshop.inventory.model.ProductInventory;
import com.roncoo.eshop.inventory.service.IProductInventoryService;
/** * 商品库存读请求 * 1.查询数据库 * 2.设置缓存 * @author Administrator * */
public class ProductInventoryReadRequest implements Request{
/** * 商品Id */
private Integer productId;
@Autowired
private IProductInventoryService productInventoryService;
public ProductInventoryReadRequest(Integer productId, IProductInventoryService productInventoryService) {
this.productId = productId;
this.productInventoryService = productInventoryService;
}
@Override
public void process() {
//1.从数据库中查询最新商品库存
ProductInventory productInventory = productInventoryService.findProductInventoryByProductId(productId);
//2.将商品库存设置到redis缓存中
productInventoryService.setProductInventoryToCache(productInventory);
}
}
复制代码
项目结构:
这一步的操做主要是将过来的请求根据商品id路由到对应的内存队列中。接受的参数是请求.
我的理解叫service这个名称不太好,换个名称不如叫作接口路由代理
service接口 RequestAsyncServiceImpl
package com.roncoo.eshop.inventory.service.impl;
import java.util.concurrent.ArrayBlockingQueue;
import org.springframework.stereotype.Service;
import com.roncoo.eshop.inventory.request.Request;
import com.roncoo.eshop.inventory.request.RequestQueue;
import com.roncoo.eshop.inventory.service.RequestAsyncService;
/** * 处理请求的异步service * 1.将请求路由到不一样的内存队列 * 2.将请求放入到内存队列中 * @author Administrator * */
@Service("requestAsyncService")
public class RequestAsyncServiceImpl implements RequestAsyncService{
@Override
public void process(Request request) {
try {
//作请求的路由,根据每一个请求的商品id,路由到对应的内存队列中
ArrayBlockingQueue<Request> queue = getRoutingQueue(request.getProductId());
queue.put(request);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private ArrayBlockingQueue<Request> getRoutingQueue(Integer productId){
RequestQueue requestQueue = RequestQueue.getInstance();
//先获取productId的hash值
String key = String.valueOf(productId);
int h;
int hash = (key == null) ?0 : (h = key.hashCode()) ^ (h >>> 16);
// 对hash取模,将hash值路由到指定的内存队列
// 好比内存队列大小8
// 用内存队列的数量对hash值取模以后, 结果必定是在0-7之间
// 任何一个商品id都会被固定路由到一样的一个内存队列中去
int index = (requestQueue.queueSize() - 1) & hash;
return requestQueue.getQueue(index);
}
}
复制代码
public class RequestProcessThread implements Callable<Boolean>{
/** * 本身监控的内存队列 */
private ArrayBlockingQueue<Request> queue;
public RequestProcessThread(ArrayBlockingQueue<Request> queue) {
this.queue = queue;
}
/** * 具体的工做流程 */
@Override
public Boolean call() throws Exception {
while(true) {
//从本身监控的内存队列中拿出请求
Request request = queue.take();
//执行操做
request.process();
break;
}
return true;
}
}
复制代码
工程结构:
主要是读请求,要考虑在200ms以内不断循环,从缓存中获取数据。若是200ms内没有,再去数据库查询
package com.roncoo.eshop.inventory.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.roncoo.eshop.inventory.Response.Response;
import com.roncoo.eshop.inventory.model.ProductInventory;
import com.roncoo.eshop.inventory.request.ProductInventoryWriteRequest;
import com.roncoo.eshop.inventory.request.Request;
import com.roncoo.eshop.inventory.service.IProductInventoryService;
import com.roncoo.eshop.inventory.service.RequestAsyncService;
/** * 商品库存controller * @author Administrator * */
@Controller
public class ProductInventoryController {
@Autowired
private RequestAsyncService requestAsyncService;
@Autowired
private IProductInventoryService productInventoryService;
@RequestMapping("/updateProductInventory")
@ResponseBody
public Response updateProductInventory(ProductInventory productInventory) {
Response response = null;
try {
Request request = new ProductInventoryWriteRequest(productInventory, productInventoryService);
requestAsyncService.process(request);
response = new Response(Response.SUCCESS);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return response;
}
@RequestMapping("/getProductInventory")
@ResponseBody
public ProductInventory getProductInventory(Integer productId) {
ProductInventory productInventory = null;
try {
Request request = new ProductInventoryWriteRequest(productInventory, productInventoryService);
requestAsyncService.process(request);
//把读请求交给service异步处理之后,须要等待一会
//等待前面库存更新的操做,同时缓存舒心的操做
//若是等待的时间超过了200ms,那么就本身去数据库中查询
long startTime = System.currentTimeMillis();
long endTime = 0L;
long waitTime = 0L;
while(true) {
if(waitTime > 200) {
break;
}
//尝试从缓存中获取数据
productInventory = productInventoryService.getProductInventoryCache(productId);
//若是有数据,就返回数据
if(productInventory != null) {
return productInventory;
}
else {
Thread.sleep(20);
endTime = System.currentTimeMillis();
waitTime = endTime - startTime;
}
}
//若是在规定的时间内(通常是200ms)没有,那么就尝试本身本身去数据库获取
productInventory = productInventoryService.findProductInventoryByProductId(productId);
if(productInventory != null) {
return productInventory;
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return new ProductInventory(productId, -1L);
}
}
复制代码