Spring事务在多线程下保证原子性

背景

Springboot项目,有个需求,须要提供接口,接口调用方每一次调用时,都会上报大量的数据,接口须要知足如下要求:spring

  • 数据保存要保证数据原子性:要么所有保存成功,要么所有不保存。
  • 保证接口性能。

实践发现,即便使用批量保存,接口耗时也高达一秒多,因此须要开启多线程来保存。如今的问题是,若是保证在开启多线程保存的状况下,保证数据的原子性。数据库

思路

  • 开启多线程,每一个线程都是使用独立的DB链接。不然因为数据库是串行阻塞操做,最终仍是会变成排队操做数据库。
  • 依赖spring事务异常回滚机制。
  • 有个统一的标识来标识“是否有线程操做失败”。
  • 线程若是出现异常:先捕获异常,将标识设置为失败,而后继续抛出异常。
  • 线程若是没有异常,在执行的最后,判断标识是失败,也就是“有其余线程有执行失败”,就自定义抛出异常来回滚。
  • 经过锁来保证:全部的线程都操做完以后,一块儿判断标识是否成功;确保不会出现“还有线程的业务未执行完成,其余线程就已经结束工做”。

流程图

失败流程以下:
58241153.png多线程

代码

@Slf4j
@Component
public class AtomicConcurrentTransactionalExecutor {
    @Autowired
    private TransactionalWorker transactionalWorker;

    /**
     * @param threadWwaitTerminationTimeout
     * @param runnables
     */
    public boolean execute(int threadWwaitTerminationTimeout, Runnable... runnables) {
        int threadSize = runnables.length;
        CyclicBarrier workerCyclicBarrier = new CyclicBarrier(threadSize);
        AtomicInteger successCounter = new AtomicInteger(threadSize);
        ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
        for (Runnable runnable : runnables) {
            executorService.submit(() -> {
                try {
                    transactionalWorker.run(workerCyclicBarrier, successCounter, runnable);
                } catch (Exception e) {
                    log.error("TransactionalWorker current thread execute error before runnable.run!", e);
                }
            });
        }
        ThreadUtils.shutdown(executorService, threadWwaitTerminationTimeout, TimeUnit.SECONDS);
        return successCounter.get() == 0;
    }

    /**
     * @param threadWwaitTerminationTimeout
     * @param threadPollSize
     * @param runnable
     * @return boolean
     * @author minchin
     * @date 2020/2/12 12:33 下午
     */
    public boolean execute(int threadWwaitTerminationTimeout, int threadPollSize, Runnable runnable) {
        Runnable[] runnables = IntStream.range(0, threadPollSize)
                .mapToObj(i -> runnable)
                .toArray(Runnable[]::new);
        return execute(threadWwaitTerminationTimeout, runnables);
    }
}


@Component
@Slf4j
public class TransactionalWorker {

    /**
     * @param workerCyclicBarrier
     * @param successCounter
     * @param runnable
     */
    @Transactional(rollbackFor = Exception.class)
    public void run(CyclicBarrier workerCyclicBarrier, AtomicInteger successCounter, Runnable runnable) {
        boolean isSuccess = false;
        try {
            runnable.run();
            successCounter.decrementAndGet();
            isSuccess = true;
        } catch (Exception e) {
            log.error("TransactionalWorker current thread execute error!", e);
            isSuccess = false;
            throw e;
        } finally {
            try {
                // 若是是数据库操做慢致使长时间阻塞,并不会被线程池中断(Interrupt),也就是会等到数据库操做完成以后,进入到这一步,而后直接报超时异常
                workerCyclicBarrier.await();
            } catch (Exception e) {
                // 等待其余线程时超时
                log.error("TransactionalWorker current thread execute CyclicBarrier.await error!", e);
                if (isSuccess) {
                    // 要回滚计数,不然:假设所有线程都操做成功,但恰好超时,主线程shutdown线程池后,计数为0,会返回成功
                    successCounter.incrementAndGet();
                }
            }
        }
        if (successCounter.get() != 0) {
            log.error("TransactionalWorker other thread execute error, create SystemException to rollback!");
            throw new SystemException("TransactionalWorker other thread execute error, create SystemException to rollback!");
        }
    }
}


@Slf4j
public class ThreadUtils {

    private ThreadUtils() {
    }

    /**
     * @param pool
     * @param awaitTerminationTimeout
     * @param timeUnit
     * @return 若是出现异常,则返回false
     */
    public static boolean shutdown(ExecutorService pool, int awaitTerminationTimeout, TimeUnit timeUnit) {
        try {
            pool.shutdown();
            boolean done = false;
            try {
                done = awaitTerminationTimeout > 0 && pool.awaitTermination(awaitTerminationTimeout, timeUnit);
            } catch (InterruptedException e) {
                log.error("thread pool awaitTermination error!", e);
            }
            if (!done) {
                pool.shutdownNow();
                if(awaitTerminationTimeout > 0) {
                    pool.awaitTermination(awaitTerminationTimeout, timeUnit);
                }
            }
        } catch (Exception e) {
            log.error("thread pool shutdown error!", e);
            return false;
        }
        return true;
    }
}

使用例子

  • 一样的业务,拆分多个线程来
return atomicConcurrentTransactionalExecutor.execute(10, 2,
  // 业务
  () -> testService.test1()
}
  • 不一样的业务,每一个线程操做不一样的业务
return atomicConcurrentTransactionalExecutor.execute(10,
  // 业务1
  () -> testService.test1(),
  // 业务2
  () -> testService.test2(),
}
注意在使用时,同一个类内,调用内部方法,Spring事务不生效的问题。 任务超时的便捷测试还须要严格再测试!
相关文章
相关标签/搜索