今天的博客有点多,由于前几天一直用笔记录,今天都补上了。后面的博客先停一段时间,后面还有dubbo、storm、kafka、solor、nginx、keepalived、fastdfs等内容,只是由于最近准备跳槽,停更一段时间,等到新公司后再继续更新。java
场景1:支付宝转1w到余额宝,支付宝扣了1w,服务挂了怎么办?余额尚未加上nginx
场景2:订单系统和库存系统如何保持一致服务器
若是是本地的话很好解决网络
那若是是跨系统的呢?该如何解决?并发
有一种思路是这样的:分布式
可是这个有个问题,那就是性能很受影响,主要卡在事务协调器这里。ide
RocketMQ的实现方式以下(图片来自网络):性能
支付宝先生成 扣款信息 --> 消息队列 --> 余额宝消费消息atom
发送消息:spa
1 import com.alibaba.rocketmq.client.exception.MQClientException; 2 import com.alibaba.rocketmq.client.producer.SendResult; 3 import com.alibaba.rocketmq.client.producer.TransactionCheckListener; 4 import com.alibaba.rocketmq.client.producer.TransactionMQProducer; 5 import com.alibaba.rocketmq.common.message.Message; 6 7 8 /** 9 * 发送事务消息例子 10 * 11 */ 12 public class TransactionProducer { 13 public static void main(String[] args) throws MQClientException, InterruptedException { 14 15 TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); 16 TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); 17 // 事务回查最小并发数 18 producer.setCheckThreadPoolMinSize(2); 19 // 事务回查最大并发数 20 producer.setCheckThreadPoolMaxSize(2); 21 // 队列数 22 producer.setCheckRequestHoldMax(2000); 23 producer.setTransactionCheckListener(transactionCheckListener); 24 producer.start(); 25 26 String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" }; 27 TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); 28 for (int i = 0; i < 100; i++) { 29 try { 30 Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, 31 ("Hello RocketMQ " + i).getBytes()); 32 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); 33 System.out.println(sendResult); 34 35 Thread.sleep(10); 36 } 37 catch (MQClientException e) { 38 e.printStackTrace(); 39 } 40 } 41 42 for (int i = 0; i < 100000; i++) { 43 Thread.sleep(1000); 44 } 45 46 producer.shutdown(); 47 48 } 49 }
执行本地事务
1 import java.util.concurrent.atomic.AtomicInteger; 2 3 import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; 4 import com.alibaba.rocketmq.client.producer.LocalTransactionState; 5 import com.alibaba.rocketmq.common.message.Message; 6 7 8 /** 9 * 执行本地事务 10 */ 11 public class TransactionExecuterImpl implements LocalTransactionExecuter { 12 private AtomicInteger transactionIndex = new AtomicInteger(1); 13 14 15 @Override 16 public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { 17 int value = transactionIndex.getAndIncrement(); 18 19 if (value == 0) { 20 throw new RuntimeException("Could not find db"); 21 } 22 else if ((value % 5) == 0) { 23 return LocalTransactionState.ROLLBACK_MESSAGE; 24 } 25 else if ((value % 4) == 0) { 26 return LocalTransactionState.COMMIT_MESSAGE; 27 } 28 29 return LocalTransactionState.UNKNOW; 30 } 31 }
服务器回查客户端(这个功能在开源版本中已经被咔掉了,可是咱们仍是要写,否则报错)
1 import java.util.concurrent.atomic.AtomicInteger; 2 3 import com.alibaba.rocketmq.client.producer.LocalTransactionState; 4 import com.alibaba.rocketmq.client.producer.TransactionCheckListener; 5 import com.alibaba.rocketmq.common.message.MessageExt; 6 7 8 /** 9 * 未决事务,服务器回查客户端 10 */ 11 public class TransactionCheckListenerImpl implements TransactionCheckListener { 12 private AtomicInteger transactionIndex = new AtomicInteger(0); 13 14 15 @Override 16 public LocalTransactionState checkLocalTransactionState(MessageExt msg) { 17 System.out.println("server checking TrMsg " + msg.toString()); 18 19 int value = transactionIndex.getAndIncrement(); 20 if ((value % 6) == 0) { 21 throw new RuntimeException("Could not find db"); 22 } 23 else if ((value % 5) == 0) { 24 return LocalTransactionState.ROLLBACK_MESSAGE; 25 } 26 else if ((value % 4) == 0) { 27 return LocalTransactionState.COMMIT_MESSAGE; 28 } 29 30 return LocalTransactionState.UNKNOW; 31 } 32 }
到这就完了,为何只介绍发送不介绍接收呢?由于一旦消息提交到MQ就不用管了, 要相信MQ会把消息送达consumer,若是消息未能被成功消费的话,那么Producer也会回滚
如何保证分布式系统的全局性事务?
由于阿里在3.2.6版本后,砍掉了消息回查的功能,也就是consumer端是否成功消费,Producer端并不知道,因此若是要保证全局性事务,咱们要有本身的实现机制: