遇到问题:java
最近作微信支付,项目上线一阵,发现一个问题。有一条订单流水竟然在数据库的出现两次。这个问题很是严重。spring
查看微信回调系统的接口代码发现代码是没错的(正常状况下),而此次遇到非正常状况了数据库
缘由:微信支付成功后回调咱们系统接口在极短期回调了2次,微信官方文档说明了,是最短15s回调一次。浏览器
前几天微信支付抽风了,可能业务出现了波动。缓存
简单来讲就是在并发状况下没有作数据惟一性处理,无论怎么样这类并发状况都是有必要的处理。tomcat
解决方式:使用线程池+队列服务器
项目基于Spring,若是不用spring须要本身把微信
ThreadPoolManager.java
改为单例模式并发
1.写一个Controller(Spring mvc)mvc
/** * @author HeyS1 * @date 2016/12/1 * @description */ @Controller public class ThreadPoolController { @Autowired ThreadPoolManager tpm; @RequestMapping("/pool") public @ResponseBody Object test() { for (int i = 0; i < 500; i++) { //模拟并发500条记录 tpm.processOrders(Integer.toString(i)); } return "ok"; } }
2.线程池管理
/** * @author HeyS1 * @date 2016/12/1 * @description threadPool订单线程池, 处理订单 * scheduler 调度线程池 用于处理订单线程池因为超出线程范围和队列容量而不能处理的订单 */ @Component public class ThreadPoolManager implements BeanFactoryAware { private static Logger log = LoggerFactory.getLogger(ThreadPoolManager.class); private BeanFactory factory;//用于从IOC里取对象 // 线程池维护线程的最少数量 private final static int CORE_POOL_SIZE = 2; // 线程池维护线程的最大数量 private final static int MAX_POOL_SIZE = 10; // 线程池维护线程所容许的空闲时间 private final static int KEEP_ALIVE_TIME = 0; // 线程池所使用的缓冲队列大小 private final static int WORK_QUEUE_SIZE = 50; // 消息缓冲队列 Queue<Object> msgQueue = new LinkedList<Object>(); //用于储存在队列中的订单,防止重复提交 Map<String, Object> cacheMap = new ConcurrentHashMap<>(); //因为超出线程范围和队列容量而使执行被阻塞时所使用的处理程序 final RejectedExecutionHandler handler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //System.out.println("太忙了,把该订单交给调度线程池逐一处理" + ((DBThread) r).getMsg()); msgQueue.offer(((DBThread) r).getMsg()); } }; // 订单线程池 final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler); // 调度线程池。此线程池支持定时以及周期性执行任务的需求。 final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5); // 访问消息缓存的调度线程,每秒执行一次 // 查看是否有待定请求,若是有,则建立一个新的AccessDBThread,并添加到线程池中 final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { if (!msgQueue.isEmpty()) { if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) { System.out.print("调度:"); String orderId = (String) msgQueue.poll(); DBThread accessDBThread = (DBThread) factory.getBean("dBThread"); accessDBThread.setMsg(orderId); threadPool.execute(accessDBThread); } // while (msgQueue.peek() != null) { // } } } }, 0, 1, TimeUnit.SECONDS); //终止订单线程池+调度线程池 public void shutdown() { //true表示若是定时任务在执行,当即停止,false则等待任务结束后再中止 System.out.println(taskHandler.cancel(false)); scheduler.shutdown(); threadPool.shutdown(); } public Queue<Object> getMsgQueue() { return msgQueue; } //将任务加入订单线程池 public void processOrders(String orderId) { if (cacheMap.get(orderId) == null) { cacheMap.put(orderId,new Object()); DBThread accessDBThread = (DBThread) factory.getBean("dBThread"); accessDBThread.setMsg(orderId); threadPool.execute(accessDBThread); } } //BeanFactoryAware @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { factory = beanFactory; } }
3.线程池中工做的线程
//线程池中工做的线程 @Component @Scope("prototype")//spring 多例 public class DBThread implements Runnable { private String msg; private Logger log = LoggerFactory.getLogger(DBThread.class); @Autowired SystemLogService systemLogService; @Override public void run() { //模拟在数据库插入数据 Systemlog systemlog = new Systemlog(); systemlog.setTime(new Date()); systemlog.setLogdescribe(msg); //systemLogService.insert(systemlog); log.info("insert->" + msg); } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } }
浏览器输入地址127.0.0.1/pool
几秒后关闭tomcat。
模拟500条数据,订单线程池处理了117条。调度线程池处理5条
关闭tomcat,后还有378条未处理(这里的实现须要用到spring监听器)。加起来一共500
OK。完毕
spring监听器,监听tomcat关闭事件:
public class MyApplicationListener implements ApplicationListener<ApplicationEvent> { @Autowired ThreadPoolManager threadPoolManager; @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof ContextClosedEvent) { XmlWebApplicationContext x = (XmlWebApplicationContext) event.getSource(); //防止执行两次。root application context 没有parent,他就是老大 if (x.getDisplayName().equals("Root WebApplicationContext")) { threadPoolManager.shutdown(); Queue q = threadPoolManager.getMsgQueue(); System.out.println("关闭了服务器,还有未处理的信息条数:" + q.size()); } } else if (event instanceof ContextRefreshedEvent) { // System.out.println(event.getClass().getSimpleName()+" 事件已发生!"); } else if (event instanceof ContextStartedEvent) { // System.out.println(event.getClass().getSimpleName()+" 事件已发生!"); } else if (event instanceof ContextStoppedEvent) { // System.out.println(event.getClass().getSimpleName()+" 事件已发生!"); } else { // System.out.println("有其它事件发生:"+event.getClass().getName()); } } }
spring配置一下
<bean id="springStartListener" class="com.temp.MyApplicationListener"></bean>