再多的话就不说了,这个是接着上一讲: 【一块儿学设计模式】状态模式+装饰器模式+简单工厂模式实战:(一)提交个订单我到底经历了什么鬼? 一块儿的,一些多余的赘述请先看这个篇文章。html
一图流,仍是上一篇文章中同样的图,接下来咱们就梳理下总结模式、观察者模式、备忘录模式的应用:java
订单中心: 一、订单中心建立订单 二、订单状态流转(状态模式) 三、记录操做日志(装饰器模式+简单工厂模式) 四、订单中心通知 库存中心更新库存mysql
调度中心: 一、库存中心更新本地库存(使用命令模式+模板方法模式+工厂模式) 这个上讲已经说过:[【一块儿学设计模式】命令模式+模板方法+工厂方法实战: 如何优雅的更新商品库存...][5] 二、将更新库存数据放到消息中,调度中心消费消息(中介模式) 三、放入消息队列中,判断队列是否放满了,若是放满了须要创建离线存储(备忘录模式) 四、异步监听消息处理结果(观察者模式)redis
这个模型应该很简单,咱们来一步步拆解 一步步代码分析sql
下单后,订单中心调用库存中心 扣减库存,而后库存中心调用调度中心,调度中心再去自身库存扣减、WMS拣货单生成、发货单生成等一些列调度任务。数据库
为了解耦,库存中心将须要发送的内容放到一个内存队列中,调度中心异步去消费消息。设计模式
库存中心提供给订单中心接口app
/** * 通知库存中心,“提交订单”事件发生了 * @param orderDTO 订单DTO * @return 处理结果 */ @Override public Boolean informSubmitOrderEvent(OrderInfoDTO orderDTO) { try { // 更新本地库存 // do logic // 发送异步消息到内存队列 StockUpdateMessage message = new StockUpdateMessage(); message.setId(UUID.randomUUID().toString().replace("-", "")); message.setOperation(GoodsStockUpdateOperation.SUBMIT_ORDER); message.setParameter(orderDTO); goodsStockUpdateQueue.put(message); // 监听异步处理结果 goodsStockUpdateManager.observe(message.getId()); } catch (Exception e) { logger.error("error", e); return false; } return true; }
自定义一个内存队列dom
/** * 商品库存更新消息的队列接口 * @author wangmeng * */ public interface StockUpdateQueue { /** * 将一个消息放入队列 * @param message 消息 * @throws Exception */ void put(StockUpdateMessage message) throws Exception; /** * 直接将消息放入队列 * @param message * @throws Exception */ void putDirect(StockUpdateMessage message) throws Exception; /** * 从队列中取出一个消息 * @return * @throws Exception */ StockUpdateMessage take() throws Exception; /** * 获取队列大小 * @return * @throws Exception */ Integer size() throws Exception; } /** * 商品库存更新队列实现类 * @author wangmeng * */ @Component public class StockUpdateQueueImpl implements StockUpdateQueue { private static final Integer QUEUE_MAX_SIZE = 1000; /** * 离线存储管理组件 */ @Autowired private OfflineStorageManager offlineStorageManager; /** * 商品库存更新队列 */ private ArrayBlockingQueue<StockUpdateMessage> queue = new ArrayBlockingQueue<StockUpdateMessage>(QUEUE_MAX_SIZE); /** * 将一个消息放入队列 * @param message 消息 * @throws Exception */ @Override public void put(StockUpdateMessage message) throws Exception { queue.put(message); } /** * 从队列中取出一个消息 * @return * @throws Exception */ @Override public StockUpdateMessage take() throws Exception { return queue.take(); } /** * 直接将消息放入队列 * @param message * @throws Exception */ @Override public void putDirect(StockUpdateMessage message) throws Exception { queue.put(message); } /** * 获取队列大小 * @return * @throws Exception */ @Override public Integer size() throws Exception { return queue.size(); } }
自定义消息体异步
/** * 商品库存更新消息 * @author wangmeng * */ @Data public class StockUpdateMessage { /** * id */ private String id; /** * 商品库存更新操做 */ private Integer operation; /** * 核心参数数据 */ private Object parameter; }
调度中心消息消费者
/** * 库存更新消息消费者 * @author wangmeng * */ @Component public class ScheduleStockUpdateMessageConsumer extends Thread { private static final Logger logger = LoggerFactory.getLogger( ScheduleStockUpdateMessageConsumer.class); /** * 库存更新消息队列 */ @Autowired private StockUpdateQueue stockUpdateQueue; /** * 调度中心接口 */ @Autowired private ScheduleService scheduleService; /** * 库存中心的消息管理器 */ @Autowired private StockUpdateResultManager stockUpdateResultManager; /** * 消费库存更新消息 */ @Override public void run() { while(true) { try { StockUpdateMessage message = stockUpdateQueue.take(); if(!isOrderRelatedMessage(message)) { continue; } OrderInfoDTO order = getOrderFromMessage(message); processMessage(message, order); stockUpdateResultManager.inform(message.getId(), true); } catch (Exception e) { logger.error("error", e); } } } /** * 是不是订单相关的操做 * @param message 消息 * @return 是不是订单相关的操做 * @throws Exception */ private Boolean isOrderRelatedMessage(StockUpdateMessage message) throws Exception { return GoodsStockUpdateOperation.SUBMIT_ORDER.equals(message.getOperation()) || GoodsStockUpdateOperation.CANCEL_ORDER.equals(message.getOperation()) || GoodsStockUpdateOperation.PAY_ORDER.equals(message.getOperation()); } /** * 从消息中获取订单 * @param message 消息 * @return 订单 * @throws Exception */ private OrderInfoDTO getOrderFromMessage(StockUpdateMessage message) throws Exception { return (OrderInfoDTO) message.getParameter(); } /** * 处理消息 * @param order 订单 * @return 处理结果 * @throws Exception */ private Boolean processMessage(StockUpdateMessage message, OrderInfoDTO order) throws Exception { if(GoodsStockUpdateOperation.SUBMIT_ORDER.equals(message.getOperation())) { return scheduleService.informSubmitOrderEvent(order); } else if(GoodsStockUpdateOperation.CANCEL_ORDER.equals(message.getOperation())) { return scheduleService.informCancelOrderEvent(order); } else if(GoodsStockUpdateOperation.PAY_ORDER.equals(message.getOperation())) { return scheduleService.informPayOrderEvent(order); } return false; } }
这里咱们用的是一个内存阻塞队列,那么咱们就须要考虑若是消费者出现异常或者消费过慢的状况致使消息阻塞该怎么办?
这里咱们使用备忘录模式 记录队列中队列是否满载,若是是则加入到离线存储,保存到db中。若是队列恢复size=0 再将离线数据放入队列中。
消息放入队列
/** * 将一个消息放入队列 * @param message 消息 * @throws Exception */ public void put(StockUpdateMessage message) throws Exception { // 每次要往内存队列放消息以前,先检查一下离线存储标识 // 若是触发了离线存储,直接就往离线存储去写入,不要走后面的逻辑了 // 写完离线存储以后,须要检查一下内存队列的大小,若是内存队列已经清零,则启动一个后台线程 // 让后台线程去将离线存储中的数据恢复写入内存队列中 if(offlineStorageManager.getOffline()) { offlineStorageManager.store(message); if(queue.size() == 0) { new OfflineResumeThread(offlineStorageManager, this).start(); } return; } // 若是内存队列已经满了,此时就触发离线存储 if(QUEUE_MAX_SIZE.equals(queue.size())) { offlineStorageManager.store(message); offlineStorageManager.setOffline(true); return; } queue.put(message); }
离线存储管理器
/** * 离线存储管理组件接口 * @author wangmeng * */ public interface OfflineStorageManager { /** * 离线存储库存更新消息 * @param message 库存更新消息 * @throws Exception */ void store(StockUpdateMessage message) throws Exception; /** * 获取离线存储标识 * @return 离线存储标识 * @throws Exception */ Boolean getOffline() throws Exception; /** * 设置离线存储标识 * @param offline 离线存储标识 * @throws Exception */ void setOffline(Boolean offline) throws Exception; /** * 所谓的迭代器模式,何时用? * * 其实只有一个场景,就是若是你须要基于一些不支持迭代的数据,来让咱们业务代码进行迭代 * 那么你本身就要去实现基于那个数据的一套迭代代码 * 以迭代器的方式返回回去给业务方,来经过你定义的迭代器,进行数据的迭代 * * mysql数据库,自己是不支持迭代式访问的,可是咱们能够本身实现一套基于mysql的迭代访问的代码 * 把一个迭代器给返回回去 * * 好比有的时候,咱们可能还须要基于es、redis的数据,来提供业务方迭代式访问的功能,那么此时就只能咱们本身 * 去封装迭代器,在里面封装基于es、redis的迭代访问数据的逻辑 * */ /** * 获取迭代器 * @return 迭代器 * @throws Exception */ OfflineStorageIterator iterator() throws Exception; /** * 批量删除库存更新消息 * @param stockUpdateMessages 库存更新消息 * @throws Exception */ void removeByBatch(List<StockUpdateMessage> stockUpdateMessages) throws Exception; } /** * 离线存储管理组件 * @author wangmeng * */ @Component public class OfflineStorageManagerImpl implements OfflineStorageManager { /** * 库存更新消息管理模块DAO组件 */ @Autowired private StockUpdateMessageDAO stockUpdateMessageDAO; /** * 是否触发离线存储的标识 */ private Boolean offline = false; /** * 离线存储库存更新消息 * @param message 库存更新消息 * @throws Exception */ @Override public void store(StockUpdateMessage message) throws Exception { StockUpdateMessageDO stockUpdateMessageDO = createStockUpdateMessageDO(message); stockUpdateMessageDAO.save(stockUpdateMessageDO); } /** * 建立库存更新消息DO对象 * @param message 库存更新消息 * @return 库存更新消息DO对象 * @throws Exception */ private StockUpdateMessageDO createStockUpdateMessageDO( StockUpdateMessage message) throws Exception { StockUpdateMessageDO stockUpdateMessageDO = new StockUpdateMessageDO(); stockUpdateMessageDO.setMessageId(message.getId()); stockUpdateMessageDO.setOperation(message.getOperation()); stockUpdateMessageDO.setParameter(JSONObject.toJSONString(message.getParameter())); stockUpdateMessageDO.setParamterClazz(message.getParameter().getClass().getName()); stockUpdateMessageDO.setGmtCreate(new Date()); stockUpdateMessageDO.setGmtModified(new Date()); return stockUpdateMessageDO; } /** * 获取离线存储标识 * @return 离线存储标识 * @throws Exception */ @Override public Boolean getOffline() throws Exception { return offline; } /** * 设置离线存储标识 * @param offline 离线存储标识 * @throws Exception */ @Override public void setOffline(Boolean offline) throws Exception { this.offline = offline; } /** * 批量删除库存更新消息 * @param stockUpdateMessages 库存更新消息 * @throws Exception */ @Override public void removeByBatch(List<StockUpdateMessage> stockUpdateMessages) throws Exception { StringBuilder builder = new StringBuilder(""); for(int i = 0; i < stockUpdateMessages.size(); i++) { builder.append(stockUpdateMessages.get(i).getId()); if(i < stockUpdateMessages.size() - 1) { builder.append(","); } } stockUpdateMessageDAO.removeByBatch(builder.toString()); } /** * 获取离线数据迭代器 * @throws Exception */ @Override public OfflineStorageIterator iterator() throws Exception { return new OfflineStorageIteratorImpl(); } /** * 离线数据迭代器 * @author zhonghuashishan * */ public class OfflineStorageIteratorImpl implements OfflineStorageIterator { /** * 判断是否还有下一批库存更新消息 * @return 是否还有下一批库存更新消息 * @throws Exception */ @Override public Boolean hasNext() throws Exception { return stockUpdateMessageDAO.count().equals(0L) ? false : true; } /** * 获取下一批库存更新消息 * @return 下一批库存更新消息 * @throws Exception */ @Override public List<StockUpdateMessage> next() throws Exception { List<StockUpdateMessage> stockUpdateMessages = new ArrayList<StockUpdateMessage>(); List<StockUpdateMessageDO> stockUpdateMessageDOs = stockUpdateMessageDAO.listByBatch(); for(StockUpdateMessageDO stockUpdateMessageDO : stockUpdateMessageDOs) { StockUpdateMessage stockUpdateMessage = new StockUpdateMessage(); stockUpdateMessage.setId(stockUpdateMessageDO.getMessageId()); stockUpdateMessage.setOperation(stockUpdateMessageDO.getOperation()); stockUpdateMessage.setParameter(JSONObject.parseObject(stockUpdateMessageDO.getParameter(), Class.forName(stockUpdateMessageDO.getParamterClazz()))); stockUpdateMessages.add(stockUpdateMessage); } return stockUpdateMessages; } } }
离线数据恢复类
/** * 离线数据恢复线程 * @author wangmeng * */ public class OfflineResumeThread extends Thread { private static final Logger logger = LoggerFactory.getLogger(OfflineResumeThread.class); /** * 离线存储管理组件 */ private OfflineStorageManager offlineStorageManager; /** * 库存更新队列 */ private StockUpdateQueue stockUpdateQueue; /** * 构造函数 * @param offlineStorageManager 离线存储管理组件 */ public OfflineResumeThread(OfflineStorageManager offlineStorageManager, StockUpdateQueue stockUpdateQueue) { this.offlineStorageManager = offlineStorageManager; this.stockUpdateQueue = stockUpdateQueue; } /** * 执行线程 */ @Override public void run() { try { // 若是表中还有数据的话 OfflineStorageIterator offlineStorageIterator = offlineStorageManager.iterator(); while(offlineStorageIterator.hasNext()) { try { // 每次就从mysql中查询50条数据,批量查询,批量处理,批量删除 List<StockUpdateMessage> stockUpdateMessages = offlineStorageIterator.next(); // 将这批数据写入内存队列中 for(StockUpdateMessage message : stockUpdateMessages) { stockUpdateQueue.putDirect(message); } // 批量删除这批数据 offlineStorageManager.removeByBatch(stockUpdateMessages); } catch (Exception e) { logger.error("error", e); } } // 此时mysql中的数据所有恢复完,更新内存标识 offlineStorageManager.setOffline(false); } catch (Exception e) { logger.error("error", e); } } }
咱们在上面 其实已经有了一端代码 是描述异步监听消费结果的,这里再来具体贴下 观察者、被观察者的代码。
被观察者
/** * 商品库存更新结果观察目标 * @author wangmeng * */ public class StockUpdateObservable extends Observable { /** * 消息id */ private String messageId; /** * 构造函数 * @param messageId 消息id */ public StockUpdateObservable(String messageId) { this.messageId = messageId; } /** * 设置商品库存更新结果 * @param result 商品库存更新结果 */ public void setResult(Boolean result) { StockUpdateResult goodsStockUpdateResult = new StockUpdateResult(); goodsStockUpdateResult.setMessageId(messageId); goodsStockUpdateResult.setResult(result); this.setChanged(); this.notifyObservers(goodsStockUpdateResult); } public String getMessageId() { return messageId; } }
观察者
/** * 商品库存更新结果观察者 * @author wangmeng * */ @Component public class StockUpdateObserver implements Observer { private static final Logger logger = LoggerFactory.getLogger( StockUpdateObserver.class); /** * 通知异步处理结果 */ @Override public void update(Observable o, Object arg) { StockUpdateResult result = (StockUpdateResult) arg; logger.info("商品库存更新消息[messageId=" + result.getMessageId() + "]" + "的异步处理结果为:" + result.getResult()); } }
添加观察者
observe方法是订单中心通知库存中心更新库存的时候调用的,库存中心给调度中心发送异步消息,而后将这个消息的messageId加入到观察者中。
inform方法是调度中心的消息消费者调用的,若是消费成功,调度中心会调用inform方法,设置result=true
```java /** * 商品库存更新结果管理组件 * @author wangmeng * */ @Component public class StockUpdateResultManagerImpl implements StockUpdateResultManager { /** * 商品库存更新结果map */ private Map<String, StockUpdateObservable> observableMap = new ConcurrentHashMap<String, StockUpdateObservable>(); /** * 商品库存更新结果观察者 */ @Autowired private StockUpdateObserver observer; /** * 设置对商品库存更新结果的观察 * @param messageId 消息id * @param result 商品库存更新结果 * @param observer 商品库存更新结果的观察者 */ @Override public void observe(String messageId) { StockUpdateObservable observable = new StockUpdateObservable(messageId); observable.addObserver(observer); observableMap.put(messageId, observable); } /** * 获取商品库存更新结果的观察目标 * @param messageId 商品库存更新消息id * @return 商品库存更新结果的观察目标 */ @Override public void inform(String messageId, Boolean result) { StockUpdateObservable observable = observableMap.get(messageId); observable.setResult(result); observableMap.remove(messageId); } /** * 获取库存更新结果观察目标 * @param messageId 消息id * @return */ @Override public StockUpdateObservable getObservable(String messageId) { return observableMap.get(messageId); } } ```
本篇内容有点多,主要是分为了三大块,而后结合了中介者模式、备忘录模式、观察者模式。
其中在离线消息恢复的类中仍是用了迭代器模式。
代码作了简单的抽离,我相信读起来仍是很轻松的,设计模式系列要先告一段落了,这几篇文章涉及了 一些经常使用的设计模式,后面若是有新的模式还会继续连载更新。
本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!
感兴趣的小伙伴可关注我的公众号:壹枝花算不算浪漫
原文出处:https://www.cnblogs.com/wang-meng/p/12079082.html