有时在工做中,咱们须要将大量的数据持久化到数据库中,若是数据量很大的话直接插入的执行速度很是慢,而且因为插入操做也没有太多可以进行sql优化的地方,因此只能从程序代码的角度进行优化。因此本文将尝试使用几种不一样方式对插入操做进行优化,看看如何可以最大程度的缩短SQL执行时间。java
以插入1000条数据为例,首先进行数据准备,用于插入数据库测试:sql
private List<Order> prepareData(){ List<Order> orderList=new ArrayList<>(); for (int i = 1; i <= 1000; i++) { Order order=new Order(); order.setId(Long.valueOf(i)); order.setOrderNumber("A"); order.setMoney(100D); order.setTenantId(1L); orderList.add(order); } return orderList; }
首先测试直接插入1000条数据:数据库
public void noBatch() { List<Order> orderList = prepareData(); long startTime = System.currentTimeMillis(); for (Order order : orderList) { orderMapper.insert(order); } System.out.println("总耗时: " + (System.currentTimeMillis() - startTime) / 1000.0 + "s"); }
执行时间以下:服务器
接下来,使用mybatis-plus的批量查询,咱们本身的Service接口须要继承IService接口:mybatis
public interface SqlService extends IService<Order> { }
在实现类SqlServiceImpl中直接调用saveBatch方法:多线程
public void plusBatch() { List<Order> orderList = prepareData(); long startTime = System.currentTimeMillis(); saveBatch(orderList); System.out.println("总耗时: " + (System.currentTimeMillis() - startTime) / 1000.0 + "s"); }
执行代码,查看运行时间:app
能够发现,使用mybatis-plus的批量插入并无比循环单条插入显著缩短期,因此来查看一下saveBatch方法的源码:框架
@Transactional(rollbackFor = Exception.class) @Override public boolean saveBatch(Collection<T> entityList, int batchSize) { String sqlStatement = sqlStatement(SqlMethod.INSERT_ONE); return executeBatch(entityList, batchSize, (sqlSession, entity) -> sqlSession.insert(sqlStatement, entity)); }
其中调用了executeBatch方法:less
protected <E> boolean executeBatch(Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) { Assert.isFalse(batchSize < 1, "batchSize must not be less than one"); return !CollectionUtils.isEmpty(list) && executeBatch(sqlSession -> { int size = list.size(); int i = 1; for (E element : list) { consumer.accept(sqlSession, element); if ((i % batchSize == 0) || i == size) { sqlSession.flushStatements(); } i++; } }); }
在for循环中,consumer的accept执行的是sqlSession的insert操做,这一阶段都是对sql的拼接,只有到最后当for循环执行完成后,才会将数据批量刷新到数据库中。也就是说,以前咱们向数据库服务器发起了1000次请求,可是使用批量插入,只须要发起一次请求就能够了。若是抛出异常,则会进行回滚,不会向数据库中写入数据。可是虽然减小了数据库请求的次数,对于缩短执行时间并无显著的提高。ide
Stream是JAVA8中用于处理集合的关键抽象概念,能够进行复杂的查找、过滤、数据映射等操做。而并行流Parallel Stream,能够将整个数据内容分红多个数据块,并使用多个线程分别处理每一个数据块的流。在大量数据的插入操做中,不存在数据的依赖的耦合关系,所以能够进行拆分使用并行流进行插入。测试插入的代码以下:
public void stream(){ List<Order> orderList = prepareData(); long startTime = System.currentTimeMillis(); orderList.parallelStream().forEach(order->orderMapper.insert(order)); System.out.println("总耗时: " + (System.currentTimeMillis() - startTime) / 1000.0 + "s"); }
仍是先对上面的代码进行测试:
能够发现速度比以前快了不少,这是由于并行流底层使用了Fork/Join框架,具体来讲使用了“分而治之”的思想,对任务进行了拆分,使用不一样线程进行执行,最后汇总(对Fork/Join不熟悉的同窗能够回顾一下请求合并与分而治之
这篇文章,里面介绍了它的基础使用)。并行流在底层使用了ForkJoinPool线程池,从ForkJoinPool的默认构造函数中看出,它拥有的默认线程数量等于计算机的逻辑处理器数量:
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); }
也就是说,若是咱们服务器是逻辑8核的话,那么就会有8个线程来同时执行插入操做,大大缩短了执行的时间。而且ForkJoinPool线程池为了提升任务的并行度和吞吐量,采用了任务窃取机制,可以进一步的缩短执行的时间。
在并行流中,建立的ForkJoinPool的线程数量是固定的,那么经过手动修改线程池中线程的数量,可否进一步的提升执行效率呢?通常而言,在线程池中,设置线程数量等于处理器数量就能够了,由于若是建立过多线程,线程频繁切换上下文也会额外消耗时间,反而会增长执行的整体时间。可是对于批量SQL的插入操做,没有复杂的业务处理逻辑,仅仅是须要频繁的与数据库进行交互,属于I/O密集型操做。而对于I/O密集型操做,程序中存在大量I/O等待占据时间,致使CPU使用率较低。因此咱们尝试增长线程数量,来看一下可否进一步缩短执行时间呢?
定义插入任务,由于不须要返回,直接继承RecursiveAction父类。size是每一个队列中包含的任务数量,在构造方法中传入,若是一个队列中的任务数量大于它那么就继续进行拆分,直到任务数量足够小:
public class BatchInsertTask<E> extends RecursiveAction { private List<E> list; private BaseMapper<E> mapper; private int size; public BatchInsertTask(List<E> list, BaseMapper<E> mapper, int size) { this.list = list; this.mapper = mapper; this.size = size; } @Override protected void compute() { if (list.size() <= size) { list.stream().forEach(item -> mapper.insert(item)); } else { int middle = list.size() / 2; List<E> left = list.subList(0, middle); List<E> right = list.subList(middle, list.size()); BatchInsertTask<E> leftTask = new BatchInsertTask<>(left, mapper, size); BatchInsertTask<E> rightTask = new BatchInsertTask<>(right, mapper, size); invokeAll(leftTask, rightTask); } } }
使用ForkJoinPool运行上面定义的任务,线程池中的线程数取CPU线程的2倍,将执行的SQL条数均分到每一个线程的执行队列中:
public class BatchSqlUtil { public static <E> void runSave(List<E> list, BaseMapper<E> mapper) { int processors = getProcessors(); ForkJoinPool forkJoinPool = new ForkJoinPool(processors); int size = (int) Math.ceil((double)list.size() / processors); BatchInsertTask<E> task = new BatchInsertTask<E>(list, mapper, size); forkJoinPool.invoke(task); } private static int getProcessors() { int processors = Runtime.getRuntime().availableProcessors(); return processors<<=1; } }
启动测试代码:
public void batch() { List<Order> orderList = prepareData(); long startTime = System.currentTimeMillis(); BatchSqlUtil.runSave(orderList,orderMapper); System.out.println("总耗时: " + (System.currentTimeMillis() - startTime) / 1000.0 + "s"); }
查看运行时间:
能够看到,经过增长ForkJoinPool中的线程,能够进一步的缩短批量插入的时间。
若是文章对您有所帮助,欢迎关注公众号 码农参上