今天要讨论的是“Java实现多线程单条数据事务管理”,在此以前,顺便回顾一下实现多线程的几种方式html
第一种方法是继承Thread类,重写run()
方法多线程
public class TestThread extends Thread { public void run() { System.out.println("继承Thread类,重写run方法"); } }
使用时,new一个实例,执行start()
方法并发
TestThread testThread1 = new TestThread(); // 新建状态 TestThread testThread2 = new TestThread(); // 新建状态 testThread1.start(); // 就绪状态 testThread2.start(); // 就绪状态
什么时候执行取决于cpu调度app
由于Java“单继承、多实现”的特性,当咱们已经继承了一个类的时候,则没法再继承Thread类,此时能够经过实现Runnable接口的方式,实现run()
方法ide
public class TestThread extends FatherClass implements Runnable { public void run() { System.out.println("实现Runnable接口的方式,实现run方法"); } }
Thread类也是实现Runnable接口测试
使用时,须要首先实例化一个Thread,并传入本身的TestThread实例this
TestThread testThread = new TestThread(); Thread thread = new Thread(testThread); thread.start();
该方法区别于前两种的特色是:可以得到线程处理的结果。所以该方式适用于须要对线程的结果进行处理的场景线程
class TestCallable implements Callable<Integer> { @Override public Integer call() { int sum = 0; for (int i = 0; i < 100; i++) { System.out.println(Thread.currentThread().getName() + " " + i); sum += i; } return sum; } }
使用时,先建立TestCallable对象,而后使用FutureTask来包装MyCallable对象,再将FutureTask对象做为Thread对象的target建立新的线程,最后thread执行start()
方法,线程进入就绪状态code
Callable<Integer> testCallable = new TestCallable(); // 建立TestCallable对象 FutureTask<Integer> futureTask = new FutureTask<Integer>(testCallable); // 使用FutureTask来包装MyCallable对象 Thread thread = new Thread(futureTask); // FutureTask对象做为Thread对象的target建立新的线程 thread.start();
关于事务,能够看看另一篇文章谈谈Java事务htm
咱们有时会遇到这样的场景:要对大批量的数据进行更新或插入操做,须要开启多线程来提升效率,又但愿每一个线程在的处理一批数据时,可以对其中每条数据进行处理的时,作到出错时实现单条数据回滚,而不是全部数回滚(全部数据回滚后续讨论)。先看代码:
根据以上多线程知识,咱们先定义一个业务线程类以下:
public class TestTranstionalThread extends Thread { private List<BalBankDictEntity> balBankDictEntities; public TestTranstionalThread( List<BalBankDictEntity> balBankDictEntities){ this.balBankDictEntities = balBankDictEntities; } @Override public void run() { log.info("线程{}开始",Thread.currentThread().getName()); for (BalBankDictEntity balBankDictEntity : balBankDictEntities) { try{ collBillDao.insOneBank(balBankDictEntity); }catch (BusiException e){ log.error("{}回滚",balBankDictEntity.getBankId()); } } log.info("线程{}结束",Thread.currentThread().getName()); } }
insOneBank()
方法以下,注意的@Transactional
注解的事务隔离等级为:REQUIRES_NEW,建立一个新的事务。
@Transactional(propagation = Propagation.REQUIRES_NEW) public void insOneBank(BalBankDictEntity balBankDictEntity){ balBankDictMapper.insert(balBankDictEntity); /* 模拟发生异常,抛出异常,实现将已插入数据回滚 */ if (Integer.parseInt(balBankDictEntity.getBankId().substring(2)) % 100 == 0){ throw new BusiException("test"); } }
开启多线程进行业务处理,注意加上@Transactional
注解
@Transactional public void testTransactional(){ /* 模拟测试数据 */ List<BalBankDictEntity> balBankDictEntities = new ArrayList<>(); for (int i = 0 ; i < 100000 ; i ++){ BalBankDictEntity balBankDictEntity = new BalBankDictEntity(); balBankDictEntity.setBankCode("BK" + i); balBankDictEntity.setBankId("ID" + i + ""); balBankDictEntity.setBankName("N" + i + "N"); balBankDictEntities.add(balBankDictEntity); } int totalNum = balBankDictEntities.size(); log.info("totalNum" + totalNum); /* 分10个线程处理 */ ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10); int dealNum = totalNum % 10 == 0 ? totalNum / 10 : totalNum / 10 + 1; // 计算每一个线程处理的数量 for (int i = 1; i <= 10 ; i++ ){ List<BalBankDictEntity> balBankDictEntityList = splitDataList(balBankDictEntities,dealNum,10,i); // 切割数据集实现数据隔离 TestTranstionalThread testTranstional = new TestTranstionalThread(balBankDictEntityList); fixedThreadPool.execute(testTranstional); } }
最终实现多个线程并发插入数据,有异常的数据的单独回滚,不影响总体