springboot学习笔记:10.springboot+atomikos+mysql+mybatis+druid+分布式事务

前言

上一篇文章咱们整合了springboot+druid+mybatis+mysql+多数据源;css

本篇文章你们主要跟随大家涛兄在上一届基础上配置一下多数据源状况下的分布式事务;html

首先,到底啥是分布式事务呢,好比咱们在执行一个业务逻辑的时候有两步分别操做A数据源和B数据源,当咱们在A数据源执行数据更改后,在B数据源执行时出现运行时异常,那么咱们必需要让B数据源的操做回滚,并回滚对A数据源的操做;这种状况在支付业务时经常出现;好比买票业务在最后支付失败,那以前的操做必须所有回滚,若是以前的操做分布在多个数据源中,那么这就是典型的分布式事务回滚;(分布式事务详解参考)前端

了解了什么是分布式事务,那分布式事务在java的解决方案就是JTA(即Java Transaction API);springboot官方提供了 Atomikos or Bitronix解决思路java

其实,大多数状况下不少公司是使用消息队列的方式实现分布式事务,这个咱们后面的文章会单独写一篇使用MQ的方式实现分布式事务;mysql

本篇文章重点讲解springboot环境下,整合 Atomikos +mysql+mybatis+tomcat/jetty;
git

1、项目依赖

pom.xml中添加atomikos的springboot相关依赖:github

<!--分布式事务-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

 

点进去会发现里面整合好了:transactions-jms、transactions-jta、transactions-jdbc、javax.transaction-apiweb

2、把数据源的相关配置项单独提炼到一个application.yml中:

注意:ajax

1.这回咱们的spring.datasource.type 是com.alibaba.druid.pool.xa.DruidXADataSource;spring

2.spring.jta.transaction-manager-id的值在你的电脑中是惟一的,这个详细请阅读官方文档;

 

3.要把以前在application-dev.properties中的spring.datasource.*全部相关配置注释掉;

完整的yml文件以下:

spring:
  datasource:
    type: com.alibaba.druid.pool.xa.DruidXADataSource
    druid:
    
      systemDB:
        name: systemDB
        url: jdbc:mysql://localhost:3306/springboot-mybatis?useUnicode=true&characterEncoding=utf-8
        username: root
        password: root
        # 下面为链接池的补充设置,应用到上面全部数据源中
        # 初始化大小,最小,最大
        initialSize: 5
        minIdle: 5
        maxActive: 20
        # 配置获取链接等待超时的时间
        maxWait: 60000
        # 配置间隔多久才进行一次检测,检测须要关闭的空闲链接,单位是毫秒
        timeBetweenEvictionRunsMillis: 60000
        # 配置一个链接在池中最小生存的时间,单位是毫秒
        minEvictableIdleTimeMillis: 30
        validationQuery: SELECT 1
        validationQueryTimeout: 10000
        testWhileIdle: true
        testOnBorrow: false
        testOnReturn: false
        # 打开PSCache,而且指定每一个链接上PSCache的大小
        poolPreparedStatements: true
        maxPoolPreparedStatementPerConnectionSize: 20
        filters: stat,wall
        # 经过connectProperties属性来打开mergeSql功能;慢SQL记录
        connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
        # 合并多个DruidDataSource的监控数据
        useGlobalDataSourceStat: true

      businessDB:
        name: businessDB

        url: jdbc:mysql://localhost:3306/springboot-mybatis2?useUnicode=true&characterEncoding=utf-8
        username: root
        password: root
        # 下面为链接池的补充设置,应用到上面全部数据源中
        # 初始化大小,最小,最大
        initialSize: 5
        minIdle: 5
        maxActive: 20
        # 配置获取链接等待超时的时间
        maxWait: 60000
        # 配置间隔多久才进行一次检测,检测须要关闭的空闲链接,单位是毫秒
        timeBetweenEvictionRunsMillis: 60000
        # 配置一个链接在池中最小生存的时间,单位是毫秒
        minEvictableIdleTimeMillis: 30
        validationQuery: SELECT 1
        validationQueryTimeout: 10000
        testWhileIdle: true
        testOnBorrow: false
        testOnReturn: false
        # 打开PSCache,而且指定每一个链接上PSCache的大小
        poolPreparedStatements: true
        maxPoolPreparedStatementPerConnectionSize: 20
        filters: stat,wall
        # 经过connectProperties属性来打开mergeSql功能;慢SQL记录
        connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
        # 合并多个DruidDataSource的监控数据
        useGlobalDataSourceStat: true

  #jta相关参数配置
  jta:
    log-dir: classpath:tx-logs
    transaction-manager-id: txManager

 

 

3、在DruidConfig.java中实现多个数据源的注册;分布式事务管理器的注册;druid的注册;

package com.zjt.config;

import com.alibaba.druid.filter.stat.StatFilter;
import com.alibaba.druid.support.http.StatViewServlet;
import com.alibaba.druid.support.http.WebStatFilter;
import com.alibaba.druid.wall.WallConfig;
import com.alibaba.druid.wall.WallFilter;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;
import javax.transaction.UserTransaction;
import java.util.Properties;

/**
 * Druid配置
 *
 * @author zhaojiatao
 */
@Configuration
public class DruidConfig {
    @Bean(name = "systemDataSource")
    @Primary
    @Autowired
    public DataSource systemDataSource(Environment env) {
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        Properties prop = build(env, "spring.datasource.druid.systemDB.");
        ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        ds.setUniqueResourceName("systemDB");
        ds.setPoolSize(5);
        ds.setXaProperties(prop);
        return ds;

    }

    @Autowired
    @Bean(name = "businessDataSource")
    public AtomikosDataSourceBean businessDataSource(Environment env) {

        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        Properties prop = build(env, "spring.datasource.druid.businessDB.");
        ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        ds.setUniqueResourceName("businessDB");
        ds.setPoolSize(5);
        ds.setXaProperties(prop);

        return ds;
    }


    /** * 注入事物管理器 * @return
     */ @Bean(name = "xatx") public JtaTransactionManager regTransactionManager () { UserTransactionManager userTransactionManager = new UserTransactionManager(); UserTransaction userTransaction = new UserTransactionImp(); return new JtaTransactionManager(userTransaction, userTransactionManager); } private Properties build(Environment env, String prefix) {

        Properties prop = new Properties();
        prop.put("url", env.getProperty(prefix + "url"));
        prop.put("username", env.getProperty(prefix + "username"));
        prop.put("password", env.getProperty(prefix + "password"));
        prop.put("driverClassName", env.getProperty(prefix + "driverClassName", ""));
        prop.put("initialSize", env.getProperty(prefix + "initialSize", Integer.class));
        prop.put("maxActive", env.getProperty(prefix + "maxActive", Integer.class));
        prop.put("minIdle", env.getProperty(prefix + "minIdle", Integer.class));
        prop.put("maxWait", env.getProperty(prefix + "maxWait", Integer.class));
        prop.put("poolPreparedStatements", env.getProperty(prefix + "poolPreparedStatements", Boolean.class));

        prop.put("maxPoolPreparedStatementPerConnectionSize",
                env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));

        prop.put("maxPoolPreparedStatementPerConnectionSize",
                env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));
        prop.put("validationQuery", env.getProperty(prefix + "validationQuery"));
        prop.put("validationQueryTimeout", env.getProperty(prefix + "validationQueryTimeout", Integer.class));
        prop.put("testOnBorrow", env.getProperty(prefix + "testOnBorrow", Boolean.class));
        prop.put("testOnReturn", env.getProperty(prefix + "testOnReturn", Boolean.class));
        prop.put("testWhileIdle", env.getProperty(prefix + "testWhileIdle", Boolean.class));
        prop.put("timeBetweenEvictionRunsMillis",
                env.getProperty(prefix + "timeBetweenEvictionRunsMillis", Integer.class));
        prop.put("minEvictableIdleTimeMillis", env.getProperty(prefix + "minEvictableIdleTimeMillis", Integer.class));
        prop.put("filters", env.getProperty(prefix + "filters"));

        return prop;
    }










    @Bean
    public ServletRegistrationBean druidServlet() {
        ServletRegistrationBean servletRegistrationBean = new ServletRegistrationBean(new StatViewServlet(), "/druid/*");

        //控制台管理用户,加入下面2行 进入druid后台就须要登陆
        //servletRegistrationBean.addInitParameter("loginUsername", "admin");
        //servletRegistrationBean.addInitParameter("loginPassword", "admin");
        return servletRegistrationBean;
    }

    @Bean
    public FilterRegistrationBean filterRegistrationBean() {
        FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean();
        filterRegistrationBean.setFilter(new WebStatFilter());
        filterRegistrationBean.addUrlPatterns("/*");
        filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");
        filterRegistrationBean.addInitParameter("profileEnable", "true");
        return filterRegistrationBean;
    }

    @Bean
    public StatFilter statFilter(){
        StatFilter statFilter = new StatFilter();
        statFilter.setLogSlowSql(true); //slowSqlMillis用来配置SQL慢的标准,执行时间超过slowSqlMillis的就是慢。
        statFilter.setMergeSql(true); //SQL合并配置
        statFilter.setSlowSqlMillis(1000);//slowSqlMillis的缺省值为3000,也就是3秒。
        return statFilter;
    }

    @Bean
    public WallFilter wallFilter(){
        WallFilter wallFilter = new WallFilter();
        //容许执行多条SQL
        WallConfig config = new WallConfig();
        config.setMultiStatementAllow(true);
        wallFilter.setConfig(config);
        return wallFilter;
    }




}

 

 4、分别配置每一个数据源对应的sqlSessionFactory,以及MapperScan扫描的包:

MybatisDatasourceConfig.java

package com.zjt.config;

import com.zjt.util.MyMapper;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;

import javax.sql.DataSource;

/**
 * @author <a href="zhaojiatao"></a>
 * @version 1.0, 2017/11/24
 * @description
 */
@Configuration
// 精确到 mapper 目录,以便跟其余数据源隔离
@MapperScan(basePackages = "com.zjt.mapper", markerInterface = MyMapper.class, sqlSessionFactoryRef = "sqlSessionFactory")
public class MybatisDatasourceConfig {

    @Autowired
    @Qualifier("systemDataSource")
    private DataSource ds;

    @Bean
    public SqlSessionFactory sqlSessionFactory() throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(ds);
        //指定mapper xml目录
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        factoryBean.setMapperLocations(resolver.getResources("classpath:mapper/*.xml"));
        return factoryBean.getObject();

    }

    @Bean
    public SqlSessionTemplate sqlSessionTemplate() throws Exception {
        SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory()); // 使用上面配置的Factory
        return template;
    }

    //关于事务管理器,不论是JPA仍是JDBC等都实现自接口 PlatformTransactionManager
    // 若是你添加的是 spring-boot-starter-jdbc 依赖,框架会默认注入 DataSourceTransactionManager 实例。
    //在Spring容器中,咱们手工注解@Bean 将被优先加载,框架不会从新实例化其余的 PlatformTransactionManager 实现类。
    /*@Bean(name = "transactionManager")
    @Primary
    public DataSourceTransactionManager masterTransactionManager() {
        //MyBatis自动参与到spring事务管理中,无需额外配置,只要org.mybatis.spring.SqlSessionFactoryBean引用的数据源
        // 与DataSourceTransactionManager引用的数据源一致便可,不然事务管理会不起做用。
        return new DataSourceTransactionManager(ds);

    }*/

}

 

 MybatisDatasource2Config.java

package com.zjt.config;

import com.zjt.util.MyMapper;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;

import javax.sql.DataSource;

/**
 * @author <a href="zhaojiatao"></a>
 * @version 1.0, 2017/11/24
 * @description
 */
@Configuration
// 精确到 mapper 目录,以便跟其余数据源隔离
@MapperScan(basePackages = "com.zjt.mapper2", markerInterface = MyMapper.class, sqlSessionFactoryRef = "sqlSessionFactory2")
public class MybatisDatasource2Config {

    @Autowired
    @Qualifier("businessDataSource")
    private DataSource ds;

    @Bean
    public SqlSessionFactory sqlSessionFactory2() throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(ds);
        //指定mapper xml目录
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        factoryBean.setMapperLocations(resolver.getResources("classpath:mapper2/*.xml"));
        return factoryBean.getObject();

    }

    @Bean
    public SqlSessionTemplate sqlSessionTemplate2() throws Exception {
        SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory2()); // 使用上面配置的Factory
        return template;
    }

    //关于事务管理器,不论是JPA仍是JDBC等都实现自接口 PlatformTransactionManager
    // 若是你添加的是 spring-boot-starter-jdbc 依赖,框架会默认注入 DataSourceTransactionManager 实例。
    //在Spring容器中,咱们手工注解@Bean 将被优先加载,框架不会从新实例化其余的 PlatformTransactionManager 实现类。
    /*@Bean(name = "transactionManager2")
    @Primary
    public DataSourceTransactionManager masterTransactionManager() {
        //MyBatis自动参与到spring事务管理中,无需额外配置,只要org.mybatis.spring.SqlSessionFactoryBean引用的数据源
        // 与DataSourceTransactionManager引用的数据源一致便可,不然事务管理会不起做用。
        return new DataSourceTransactionManager(ds);
    }*/

}

 

 4、因为咱们本例中只使用一个事务管理器:xatx,故就不在使用TxAdviceInterceptor.java和TxAdvice2Interceptor.java中配置的事务管理器了;有需求的童鞋能够本身配置其余的事务管理器;

5、新建分布式业务测试接口JtaTestService.java和实现类JtaTestServiceImpl.java

其实就是一个很简单的test01()方法,在该方法中咱们分别前后调用classService.saveOrUpdateTClass(tClass);和teacherService.saveOrUpdateTeacher(teacher);

实现前后操做两个数据源:而后咱们能够本身debug跟踪事务的提交时机,此外,也能够在在两个方法全执行结束以后,手动制造一个运行时异常,来检查分布式事务是否所有回滚;

注意:

在实现类的方法中我使用的是:

@Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { java.lang.RuntimeException.class })

从而指定了使用哪一个事务管理器,事务隔离级别(通常都用我这个默认的),回滚的条件(通常可使用Exception),这三个能够本身根据业务实际修改;

package com.zjt.service3;

import java.util.Map;

public interface JtaTestService {

    public Map<String,Object> test01();

}
package com.zjt.service3.impl;


import com.zjt.entity.TClass;
import com.zjt.entity.Teacher;
import com.zjt.service.TClassService;
import com.zjt.service2.TeacherService;
import com.zjt.service3.JtaTestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.LinkedHashMap;
import java.util.Map;

@Service("jtaTestServiceImpl")
public class JtaTestServiceImpl implements JtaTestService{

    @Autowired
    @Qualifier("teacherServiceImpl")
    private TeacherService teacherService;
    @Autowired
    @Qualifier("tclassServiceImpl")
    private TClassService tclassService;

    @Override
    @Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { java.lang.RuntimeException.class })  public Map<String, Object> test01() {
        LinkedHashMap<String,Object> resultMap=new LinkedHashMap<String,Object>();
        TClass tClass=new TClass();
        tClass.setName("8888");
        tclassService.saveOrUpdateTClass(tClass);

        Teacher teacher=new Teacher();
        teacher.setName("8888");
        teacherService.saveOrUpdateTeacher(teacher);

        System.out.println(1/0);

        resultMap.put("state","success");
        resultMap.put("message","分布式事务同步成功");
        return resultMap;
    }
}

 

6、创建JtaTestContoller.java,接受一个来自前端的http请求,触发JtaTestService 的test01方法:

package com.zjt.web;


import com.zjt.service3.JtaTestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import java.util.LinkedHashMap;
import java.util.Map;

@Controller
@RequestMapping("/jtaTest")
public class JtaTestContoller {

    @Autowired
    @Qualifier("jtaTestServiceImpl")
    private JtaTestService taTestService;



    @ResponseBody
    @RequestMapping("/test01")
    public Map<String,Object> test01(){
        LinkedHashMap<String,Object> resultMap=new LinkedHashMap<String,Object>();
        try {
            return taTestService.test01();
        }catch (Exception e){
            resultMap.put("state","fail");
            resultMap.put("message","分布式事务同步失败");
            return resultMap;
        }
    }






}

 

 

 7、在test.ftl中增长一个按钮来测试;

//分布式事务测试
        $("#JTATest").click(function(){
            $.ajax({
                type: "POST",
                url: "${basePath!}/jtaTest/test01",
                data: {}    ,
                async: false,
                error: function (request) {
                    layer.alert("与服务器链接失败/(ㄒoㄒ)/~~");
                    return false;
                },
                success: function (data) {
                    if (data.state == 'fail') {
                        layer.alert(data.message);
                        return false;
                    }else if(data.state == 'success'){
                        layer.alert(data.message);
                    }
                }
            });
        });




<button class="layui-btn" id="JTATest">同时向班级和老师表插入名为8888的班级和老师</button>

 

 

8、启动服务,验证结果:

 

 点击这个按钮,跳转到controller:

当正常执行了sql语句以后,咱们能够发现数据库并无变化,由于整个方法的事务尚未走完,当咱们走到1/0这步时:

抛出运行时异常,并被spring事务拦截器拦截,并捕获异常:

在this.completeTransactionAfterThrowing(txInfo, var16);方法中会将事务所有回滚:

22:09:04.243 logback [http-nio-8080-exec-5] INFO c.a.i.imp.CompositeTransactionImp - rollback() done of transaction 192.168.1.103.tm0000400006

 

此时,当咱们再次打开数据库验证,依旧没有变化,证实分布式事务配置成功;

你们能够基于个人代码本身练习一下,本身尝试着使用多事务管理器的状况下的灵活配置;

 9、后记:

 

 

本文源代码:https://github.com/zhaojiatao/springboot-zjt-chapter10-springboot-atomikos-mysql-mybatis-druid.git

 

代码在tomcat和jetty环境下都可完成事务回滚;

在事务回滚时可能报一个Transactional not active的警告,我google后,老外也说不出这个具体做用,大部分人认为这只是一个警告,能够忽略;你们谁懂得或者仔细翻阅源代码后懂得的,能够告诉我一下。谢谢

相关文章
相关标签/搜索