摘要: 原创出处 http://www.iocoder.cn/TCC-Transaction/transaction-repository/ 「芋道源码」欢迎转载,保留摘要,谢谢!java
本文主要基于 TCC-Transaction 1.2.3.3 正式版git
🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:github
- RocketMQ / MyCAT / Sharding-JDBC 全部源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言都将获得认真回复。甚至不知道如何读源码也能够请教噢。
- 新的源码解析文章实时收到通知。每周更新一篇左右。
- 认真的源码交流微信群。
本文分享 事务存储器。主要涉及以下 Maven 项目:redis
tcc-transaction-core
:tcc-transaction 底层实现。在 TCC 的过程当中,根据应用内存中的事务信息完成整个事务流程。But 实际业务场景中,将事务信息只放在应用内存中是远远不够可靠的。例如:数据库
所以,TCC-Transaction 将事务信息添加到内存中的同时,会使用外部存储进行持久化。目前提供四种外部存储:apache
本文涉及到的类关系以下图( 打开大图 ):数组
你行好事会由于获得赞扬而愉悦
同理,开源项目贡献者会由于 Star 而更加有动力
为 TCC-Transaction 点赞!传送门缓存
ps:笔者假设你已经阅读过《tcc-transaction 官方文档 —— 使用指南1.2.x》。安全
在《TCC-Transaction 源码分析 —— TCC 实现》「4. 事务与参与者」,能够看到 Transaction 是一个比较复杂的对象,内嵌 Participant 数组,而 Participant 自己也是复杂的对象,内嵌了更多的其余对象,所以,存储器在持久化 Transaction 时,须要序列化后才能存储。服务器
org.mengyun.tcctransaction.serializer.ObjectSerializer
,对象序列化接口。实现代码以下:
public interface ObjectSerializer<T> {
byte[] serialize(T t);
T deserialize(byte[] bytes);
}
复制代码
目前提供 JDK自带序列化 和 Kyro序列化 两种实现。
org.mengyun.tcctransaction.serializer.JdkSerializationSerializer
,JDK 序列化实现。比较易懂,点击连接直接查看。
TCC-Transaction 使用的默认的序列化。
org.mengyun.tcctransaction.serializer.KryoTransactionSerializer
,Kyro 序列化实现。比较易懂,点击连接直接查看。
JDK 和 Kyro 的序列化实现,肉眼没法直观具体存储事务的信息,你能够经过实现 ObjectSerializer 接口,实现自定义的 JSON 序列化。
org.mengyun.tcctransaction.TransactionRepository
,事务存储器接口。实现代码以下:
public interface TransactionRepository {
/** * 新增事务 * * @param transaction 事务 * @return 新增数量 */
int create(Transaction transaction);
/** * 更新事务 * * @param transaction 事务 * @return 更新数量 */
int update(Transaction transaction);
/** * 删除事务 * * @param transaction 事务 * @return 删除数量 */
int delete(Transaction transaction);
/** * 获取事务 * * @param xid 事务编号 * @return 事务 */
Transaction findByXid(TransactionXid xid);
/** * 获取超过指定时间的事务集合 * * @param date 指定时间 * @return 事务集合 */
List<Transaction> findAllUnmodifiedSince(Date date);
}
复制代码
不一样的存储器经过实现该接口,提供事务的增删改查功能。
org.mengyun.tcctransaction.repository.CachableTransactionRepository
,可缓存的事务存储器抽象类,实现增删改查事务时,同时缓存事务信息。在上面类图,咱们也能够看到 TCC-Transaction 自带的多种存储器都继承该抽象类。
CachableTransactionRepository 构造方法实现代码以下:
public abstract class CachableTransactionRepository implements TransactionRepository {
/** * 缓存过时时间 */
private int expireDuration = 120;
/** * 缓存 */
private Cache<Xid, Transaction> transactionXidCompensableTransactionCache;
public CachableTransactionRepository() {
transactionXidCompensableTransactionCache = CacheBuilder.newBuilder().expireAfterAccess(expireDuration, TimeUnit.SECONDS).maximumSize(1000).build();
}
}
复制代码
#create(...)
实现代码以下:
@Override
public int create(Transaction transaction) {
int result = doCreate(transaction);
if (result > 0) {
putToCache(transaction);
}
return result;
}
/** * 添加到缓存 * * @param transaction 事务 */
protected void putToCache(Transaction transaction) {
transactionXidCompensableTransactionCache.put(transaction.getXid(), transaction);
}
/** * 新增事务 * * @param transaction 事务 * @return 新增数量 */
protected abstract int doCreate(Transaction transaction);
复制代码
#doCreate(...)
方法,新增事务。新增成功后,调用 #putToCache(...)
方法,添加事务到缓存。#doCreate(...)
为抽象方法,子类实现该方法,提供新增事务功能。#update(...)
实现代码以下:
@Override
public int update(Transaction transaction) {
int result = 0;
try {
result = doUpdate(transaction);
if (result > 0) {
putToCache(transaction);
} else {
throw new OptimisticLockException();
}
} finally {
if (result <= 0) { // 更新失败,移除缓存。下次访问,从存储器读取
removeFromCache(transaction);
}
}
return result;
}
/** * 移除事务从缓存 * * @param transaction 事务 */
protected void removeFromCache(Transaction transaction) {
transactionXidCompensableTransactionCache.invalidate(transaction.getXid());
}
/** * 更新事务 * * @param transaction 事务 * @return 更新数量 */
protected abstract int doUpdate(Transaction transaction);
复制代码
#doUpdate(...)
方法,更新事务。
#putToCache(...)
方法,添加事务到缓存。Transaction.version
)和存储器里的事务的版本号不一样,更新失败。为何?在《TCC-Transaction 源码分析 —— 事务恢复》详细解析。更新失败,意味着缓存已经不不一致,调用 #removeFromCache(...)
方法,移除事务从缓存中。#doUpdate(...)
为抽象方法,子类实现该方法,提供更新事务功能。#delete(...)
实现代码以下:
@Override
public int delete(Transaction transaction) {
int result;
try {
result = doDelete(transaction);
} finally {
removeFromCache(transaction);
}
return result;
}
/** * 删除事务 * * @param transaction 事务 * @return 删除数量 */
protected abstract int doDelete(Transaction transaction);
复制代码
#doDelete(...)
方法,删除事务。#removeFromCache(...)
方法,移除事务从缓存中。#doDelete(...)
为抽象方法,子类实现该方法,提供删除事务功能。#findByXid(...)
实现代码以下:
@Override
public Transaction findByXid(TransactionXid transactionXid) {
Transaction transaction = findFromCache(transactionXid);
if (transaction == null) {
transaction = doFindOne(transactionXid);
if (transaction != null) {
putToCache(transaction);
}
}
return transaction;
}
/** * 得到事务从缓存中 * * @param transactionXid 事务编号 * @return 事务 */
protected Transaction findFromCache(TransactionXid transactionXid) {
return transactionXidCompensableTransactionCache.getIfPresent(transactionXid);
}
/** * 查询事务 * * @param xid 事务编号 * @return 事务 */
protected abstract Transaction doFindOne(Xid xid);
复制代码
#findFromCache()
方法,优先从缓存中获取事务。#doFindOne()
方法,缓存中事务不存在,从存储器中获取。获取到后,调用 #putToCache()
方法,添加事务到缓存中。#doFindOne(...)
为抽象方法,子类实现该方法,提供查询事务功能。#findAllUnmodifiedSince(...)
实现代码以下:
@Override
public List<Transaction> findAllUnmodifiedSince(Date date) {
List<Transaction> transactions = doFindAllUnmodifiedSince(date);
// 添加到缓存
for (Transaction transaction : transactions) {
putToCache(transaction);
}
return transactions;
}
/** * 获取超过指定时间的事务集合 * * @param date 指定时间 * @return 事务集合 */
protected abstract List<Transaction> doFindAllUnmodifiedSince(Date date);
复制代码
#findAllUnmodifiedSince(...)
方法,从存储器获取超过指定时间的事务集合。调用 #putToCache(...)
方法,循环事务集合添加到缓存。#doFindAllUnmodifiedSince(...)
为抽象方法,子类实现该方法,提供获取超过指定时间的事务集合功能。org.mengyun.tcctransaction.repository.JdbcTransactionRepository
,JDBC 事务存储器,经过 JDBC 驱动,将 Transaction 存储到 MySQL / Oracle / PostgreSQL / SQLServer 等关系数据库。实现代码以下:
public class JdbcTransactionRepository extends CachableTransactionRepository {
/** * 领域 */
private String domain;
/** * 表后缀 */
private String tbSuffix;
/** * 数据源 */
private DataSource dataSource;
/** * 序列化 */
private ObjectSerializer serializer = new JdkSerializationSerializer();
}
复制代码
domain
,领域,或者也能够称为模块名,应用名,用于惟一标识一个资源。例如,Maven 模块 xxx-order
,咱们能够配置该属性为 ORDER
。tbSuffix
,表后缀。默认存储表名为 TCC_TRANSACTION
,配置表名后,为 TCC_TRANSACTION${tbSuffix}
。dataSource
,存储数据的数据源。serializer
,序列化。**当数据库里已经有数据的状况下,不要更换别的序列化,不然会致使反序列化报错。**建议:TCC-Transaction 存储时,新增字段,记录序列化的方式。表结构以下:
CREATE TABLE `TCC_TRANSACTION` (
`TRANSACTION_ID` int(11) NOT NULL AUTO_INCREMENT,
`DOMAIN` varchar(100) DEFAULT NULL,
`GLOBAL_TX_ID` varbinary(32) NOT NULL,
`BRANCH_QUALIFIER` varbinary(32) NOT NULL,
`CONTENT` varbinary(8000) DEFAULT NULL,
`STATUS` int(11) DEFAULT NULL,
`TRANSACTION_TYPE` int(11) DEFAULT NULL,
`RETRIED_COUNT` int(11) DEFAULT NULL,
`CREATE_TIME` datetime DEFAULT NULL,
`LAST_UPDATE_TIME` datetime DEFAULT NULL,
`VERSION` int(11) DEFAULT NULL,
PRIMARY KEY (`TRANSACTION_ID`), UNIQUE KEY `UX_TX_BQ` (`GLOBAL_TX_ID`,`BRANCH_QUALIFIER`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
复制代码
TRANSACTION_ID
,仅仅数据库自增,无实际用途。CONTENT
,Transaction 序列化。ps:点击连接查看 JdbcTransactionRepository 代码实现,已经添加完整中文注释。
org.mengyun.tcctransaction.repository.RedisTransactionRepository
,Redis 事务存储器,将 Transaction 存储到 Redis。实现代码以下:
public class RedisTransactionRepository extends CachableTransactionRepository {
/** * Jedis Pool */
private JedisPool jedisPool;
/** * key 前缀 */
private String keyPrefix = "TCC:";
/** * 序列化 */
private ObjectSerializer serializer = new JdkSerializationSerializer();
}
复制代码
keyPrefix
,key 前缀。相似 JdbcTransactionRepository 的 domain
属性。一个事务存储到 Reids,使用 Redis 的数据结构为 HASHES。
key : 使用 keyPrefix
+ xid
,实现代码以下:
/** * 建立事务的 Redis Key * * @param keyPrefix key 前缀 * @param xid 事务 * @return Redis Key */
public static byte[] getRedisKey(String keyPrefix, Xid xid) {
byte[] prefix = keyPrefix.getBytes();
byte[] globalTransactionId = xid.getGlobalTransactionId();
byte[] branchQualifier = xid.getBranchQualifier();
// 拼接 key
byte[] key = new byte[prefix.length + globalTransactionId.length + branchQualifier.length];
System.arraycopy(prefix, 0, key, 0, prefix.length);
System.arraycopy(globalTransactionId, 0, key, prefix.length, globalTransactionId.length);
System.arraycopy(branchQualifier, 0, key, prefix.length + globalTransactionId.length, branchQualifier.length);
return key;
}
复制代码
HASHES 的 key :使用 version
。
HASHES 的 value :调用 TransactionSerializer#serialize(...)
方法,序列化 Transaction。实现代码以下:
public static byte[] serialize(ObjectSerializer serializer, Transaction transaction) {
Map<String, Object> map = new HashMap<String, Object>();
map.put("GLOBAL_TX_ID", transaction.getXid().getGlobalTransactionId());
map.put("BRANCH_QUALIFIER", transaction.getXid().getBranchQualifier());
map.put("STATUS", transaction.getStatus().getId());
map.put("TRANSACTION_TYPE", transaction.getTransactionType().getId());
map.put("RETRIED_COUNT", transaction.getRetriedCount());
map.put("CREATE_TIME", transaction.getCreateTime());
map.put("LAST_UPDATE_TIME", transaction.getLastUpdateTime());
map.put("VERSION", transaction.getVersion());
// 序列化
map.put("CONTENT", serializer.serialize(transaction));
return serializer.serialize(map);
}
复制代码
在实现 #doFindAllUnmodifiedSince(date)
方法,没法像数据库使用时间条件进行过滤,所以,加载全部事务后在内存中过滤。实现代码以下:
@Override
protected List<Transaction> doFindAllUnmodifiedSince(Date date) {
// 得到全部事务
List<Transaction> allTransactions = doFindAll();
// 过滤时间
List<Transaction> allUnmodifiedSince = new ArrayList<Transaction>();
for (Transaction transaction : allTransactions) {
if (transaction.getLastUpdateTime().compareTo(date) < 0) {
allUnmodifiedSince.add(transaction);
}
}
return allUnmodifiedSince;
}
复制代码
ps:点击连接查看 RedisTransactionRepository 代码实现,已经添加完整中文注释。
FROM 《TCC-Transaction 官方文档 —— 使用指南1.2.x》
使用 RedisTransactionRepository 须要配置 Redis 服务器以下:
appendonly yes
appendfsync always
org.mengyun.tcctransaction.repository.ZooKeeperTransactionRepository
,Zookeeper 事务存储器,将 Transaction 存储到 Zookeeper。实现代码以下:
public class ZooKeeperTransactionRepository extends CachableTransactionRepository {
/** * Zookeeper 服务器地址数组 */
private String zkServers;
/** * Zookeeper 超时时间 */
private int zkTimeout;
/** * TCC 存储 Zookeeper 根目录 */
private String zkRootPath = "/tcc";
/** * Zookeeper 链接 */
private volatile ZooKeeper zk;
/** * 序列化 */
private ObjectSerializer serializer = new JdkSerializationSerializer();
}
复制代码
zkRootPath
,存储 Zookeeper 根目录,相似 JdbcTransactionRepository 的 domain
属性。一个事务存储到 Zookeeper,使用 Zookeeper 的持久数据节点。
path:${zkRootPath}
+ /
+ ${xid}
。实现代码以下:
// ZooKeeperTransactionRepository.java
private String getTxidPath(Xid xid) {
return String.format("%s/%s", zkRootPath, xid);
}
// TransactionXid.java
@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("globalTransactionId:").append(UUID.nameUUIDFromBytes(globalTransactionId).toString());
stringBuilder.append(",").append("branchQualifier:").append(UUID.nameUUIDFromBytes(branchQualifier).toString());
return stringBuilder.toString();
}
复制代码
data:调用 TransactionSerializer#serialize(...)
方法,序列化 Transaction。
version:使用 Zookeeper 数据节点自带版本功能。这里要注意下,Transaction 的版本从 1 开始,而 Zookeeper 数据节点版本从 0 开始。
ps:点击连接查看 ZooKeeperTransactionRepository 代码实现,已经添加完整中文注释。
另外,在生产上暂时不建议使用 ZooKeeperTransactionRepository,缘由有两点:
若是你要使用 Zookeeper 进行事务的存储,能够考虑使用 Apache Curator 操做 Zookeeper,重写 ZooKeeperTransactionRepository 部分代码。
org.mengyun.tcctransaction.repository.FileSystemTransactionRepository
,File 事务存储器,将 Transaction 存储到文件系统。
实现上和 ZooKeeperTransactionRepository,区别主要在于不支持乐观锁更新。有兴趣的同窗点击连接查看,这里就不拓展开来。
另外,在生产上不建议使用 FileSystemTransactionRepository,由于不支持多节点共享。用分布式存储挂载文件另说,固然仍是不建议,由于不支持乐观锁并发更新。
这篇略( 超 )微( 级 )水更,哈哈哈,为《TCC-Transaction 源码分析 —— 事务恢复》作铺垫啦。
使用 RedisTransactionRepository 和 ZooKeeperTransactionRepository 存储事务仍是 Get 蛮多点的。
胖友,分享一个朋友圈可好?