Springboot项目,有个需求,须要提供接口,接口调用方每一次调用时,都会上报大量的数据,接口须要知足如下要求:spring
实践发现,即便使用批量保存,接口耗时也高达一秒多,因此须要开启多线程来保存。如今的问题是,若是保证在开启多线程保存的状况下,保证数据的原子性。数据库
标识
来标识“是否有线程操做失败”。标识
设置为失败
,而后继续抛出异常。标识
是失败,也就是“有其余线程有执行失败”,就自定义抛出异常来回滚。标识
是否成功;确保不会出现“还有线程的业务未执行完成,其余线程就已经结束工做”。失败流程以下:多线程
@Slf4j @Component public class AtomicConcurrentTransactionalExecutor { @Autowired private TransactionalWorker transactionalWorker; /** * @param threadWwaitTerminationTimeout * @param runnables */ public boolean execute(int threadWwaitTerminationTimeout, Runnable... runnables) { int threadSize = runnables.length; CyclicBarrier workerCyclicBarrier = new CyclicBarrier(threadSize); AtomicInteger successCounter = new AtomicInteger(threadSize); ExecutorService executorService = Executors.newFixedThreadPool(threadSize); for (Runnable runnable : runnables) { executorService.submit(() -> { try { transactionalWorker.run(workerCyclicBarrier, successCounter, runnable); } catch (Exception e) { log.error("TransactionalWorker current thread execute error before runnable.run!", e); } }); } ThreadUtils.shutdown(executorService, threadWwaitTerminationTimeout, TimeUnit.SECONDS); return successCounter.get() == 0; } /** * @param threadWwaitTerminationTimeout * @param threadPollSize * @param runnable * @return boolean * @author minchin * @date 2020/2/12 12:33 下午 */ public boolean execute(int threadWwaitTerminationTimeout, int threadPollSize, Runnable runnable) { Runnable[] runnables = IntStream.range(0, threadPollSize) .mapToObj(i -> runnable) .toArray(Runnable[]::new); return execute(threadWwaitTerminationTimeout, runnables); } } @Component @Slf4j public class TransactionalWorker { /** * @param workerCyclicBarrier * @param successCounter * @param runnable */ @Transactional(rollbackFor = Exception.class) public void run(CyclicBarrier workerCyclicBarrier, AtomicInteger successCounter, Runnable runnable) { boolean isSuccess = false; try { runnable.run(); successCounter.decrementAndGet(); isSuccess = true; } catch (Exception e) { log.error("TransactionalWorker current thread execute error!", e); isSuccess = false; throw e; } finally { try { // 若是是数据库操做慢致使长时间阻塞,并不会被线程池中断(Interrupt),也就是会等到数据库操做完成以后,进入到这一步,而后直接报超时异常 workerCyclicBarrier.await(); } catch (Exception e) { // 等待其余线程时超时 log.error("TransactionalWorker current thread execute CyclicBarrier.await error!", e); if (isSuccess) { // 要回滚计数,不然:假设所有线程都操做成功,但恰好超时,主线程shutdown线程池后,计数为0,会返回成功 successCounter.incrementAndGet(); } } } if (successCounter.get() != 0) { log.error("TransactionalWorker other thread execute error, create SystemException to rollback!"); throw new SystemException("TransactionalWorker other thread execute error, create SystemException to rollback!"); } } } @Slf4j public class ThreadUtils { private ThreadUtils() { } /** * @param pool * @param awaitTerminationTimeout * @param timeUnit * @return 若是出现异常,则返回false */ public static boolean shutdown(ExecutorService pool, int awaitTerminationTimeout, TimeUnit timeUnit) { try { pool.shutdown(); boolean done = false; try { done = awaitTerminationTimeout > 0 && pool.awaitTermination(awaitTerminationTimeout, timeUnit); } catch (InterruptedException e) { log.error("thread pool awaitTermination error!", e); } if (!done) { pool.shutdownNow(); if(awaitTerminationTimeout > 0) { pool.awaitTermination(awaitTerminationTimeout, timeUnit); } } } catch (Exception e) { log.error("thread pool shutdown error!", e); return false; } return true; } }
return atomicConcurrentTransactionalExecutor.execute(10, 2, // 业务 () -> testService.test1() }
return atomicConcurrentTransactionalExecutor.execute(10, // 业务1 () -> testService.test1(), // 业务2 () -> testService.test2(), }
注意在使用时,同一个类内,调用内部方法,Spring事务不生效的问题。 任务超时的便捷测试还须要严格再测试!