2018阿里云所有产品优惠券(好东东,强烈推荐)
领取地址:https://promotion.aliyun.com/ntms/act/ambassador/sharetouser.html?userCode=gh9qh5ki&utm_source=gh9qh5kicss
讲座html
本话题已收入视频讲座《Spring Cloud分布式事务解决方案》你们不妨围观下java
开源项目git
咱们利用消息队列实现了分布式事务的最终一致性解决方案,请你们围观。能够参考Github CoolMQ源码,项目支持网站: http://rabbitmq.org.cn,最新文章或实现会更新在上面github
阿里2017云栖大会《破解世界性技术难题!GTS让分布式事务简单高效》中,阿里声称提出了一种破解世界性难题之分布式事务的终极解决方案,不管是可靠性、仍是处理速率都领先于市面上全部的技术。但使人遗憾的是一来项目未开源,二来还必须依赖阿里云的分布式数据库。毕竟,吃饭的家伙可不能轻易示人嘛。算法
虽然如此,但《世界难题...》一文中对事务仍是概括的仍是蛮到位的:“一个看似简单的功能,内部可能须要调用多个“服务”并操做多个数据库或分片来实现,单一技术手段和解决方案已没法知足这些复杂应用场景。所以,分布式系统架构中分布式事务是一个绕不过去的挑战。数据库
什么是分布式事务?简单的说,就是一次大操做由不一样小操做组成,这些小操做分布在不一样服务器上,分布式事务须要保证这些小操做要么所有成功,要么所有失败。”segmentfault
举个栗子:缓存
你上Taobao买东西,须要先扣钱,而后商品库存-1吧。但扣款和库存分别属于两个服务,这两个服务中间要通过网络、网关、主机等一系列中间层,万一任何一个地方出了问题,好比网络抖动、突发异常等待,都会致使不一致,好比扣款成功了,可是库存没-1,就会出现超卖的现象,而这就是分布式事务须要解决的问题服务器
2阶段提交是分布式事务传统解决方案,先进为止还普遍存在。当一个事务跨越多个节点时,为了保持事务ACID
特性,须要引入一个做为协调者来统一掌控全部节点(称做参与者)的操做结果并最终指示这些节点是否要把操做结果进行真正的提交(好比将更新后的数据写入磁盘等等)。所以,二阶段提交的算法思路能够归纳为:参与者将操做成败通知协调者,再由协调者根据全部参与者的反馈情报决定各参与者是否要提交操做仍是停止操做。
以开会为例
甲乙丙丁四人要组织一个会议,须要肯定会议时间,不妨设甲是协调者,乙丙丁是参与者。
投票阶段
提交阶段
不只要锁住参与者的全部资源,并且要锁住协调者资源,开销大。一句话总结就是:2PC效率很低,对高并发很不友好。
引用《世界性难题...》
一文原话 "国外具备几十年历史和技术沉淀的基于XA模型的商用分布式事务产品,在相同软硬件条件下,开启分布式事务后吞吐常常有数量级的降低。"
此外还有三阶段提交
你们有兴趣的不妨研究下
所谓柔性事务是相对强制锁表的刚性事务而言。流程入下:服务器A的事务若是执行顺利,那么事务A就先行提交,若是事务B也执行顺利,则事务B也提交,整个事务就算完成。可是若是事务B执行失败,事务B自己回滚,这时事务A已经被提交,因此须要执行一个补偿操做,将已经提交的事务A执行的操做做反操做,恢复到未执行前事务A的状态。
缺点是业务侵入性太强,还要补偿操做,缺少广泛性,无法大规模推广。
目前基于消息队列的解决方案有阿里的RocketMQ
,它实现了半消息
的解决方案,有点相似于Paxos算法,具体流程以下
[关于分布式事务,工程领域主要讨论的是强一致性和最终一致性的解决方案。典型方案包括: 两阶段提交(2PC, Two-phase Commit)方案 eBay 事件队列方案 TCC 补偿模式 缓存
第一阶段:上游应用执行业务并发送MQ消息
可靠消息系统修改消息状态为发送状态并将消息投递到 MQ 中间件
第二阶段:下游应用监听 MQ 消息并执行业务
下游应用监听 MQ 消息并执行业务,而且将消息的消费结果通知可靠消息服务。
RocketMQ
貌似是一种先进的实现方案了,但问题是缺少文档
,不管是在Apache项目主页,仍是在阿里的页面上,最多只告诉你如何用,而原理性或者指导性的东西很是缺少。
固然,若是你在阿里云上专门购买了RocketMQ
服务,想必是另当别论了。但若是你试图在本身的服务环境中部署和使用,想必要历经至关大的学习曲线。毕竟是人家吃饭的家伙嘛
RabbitMQ
遵循了AMQP规范
,用消息确认机制来保证:只要消息发送,就能确保被消费者消费来作到了消息最终一致性。并且开源,文档还异常丰富,貌似是实现分布式事务的良好载体
rabbitmq的整个发送过程以下
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (!ack) { //try to resend msg } else { //delete msg in db } });
final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { //确认收到消息 channel.basicAck(envelope.getDeliveryTag(), false); } } };
咱们来看看可能发送异常的四种
网络断了,抛出异常,业务直接回滚便可。若是出现connection closed
错误,直接增长 connection
数便可
connectionFactory.setChannelCacheSize(100);
rabbitmq
提供了确认ack机制,能够用来确认消息是否有返回。所以咱们能够在发送前在db中(内存或关系型数据库)先存一下消息,若是ack异常则进行重发
/**confirmcallback用来确认消息是否有送达消息队列*/ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (!ack) { //try to resend msg } else { //delete msg in db } }); /**若消息找不到对应的Exchange会先触发returncallback */ rabbitTemplate.setReturnCallback((message, replyCode, replyText, tmpExchange, tmpRoutingKey) -> { try { Thread.sleep(Constants.ONE_SECOND); } catch (InterruptedException e) { e.printStackTrace(); } log.info("send message failed: " + replyCode + " " + replyText); rabbitTemplate.send(message); });
若是设置了消息持久化,那么ack= true
是在消息持久化完成后,就是存到硬盘上以后再发送的,确保消息已经存在硬盘上,万一消息服务挂了,消息服务恢复是可以再重发消息
消息服务收到消息后,消息会处于"UNACK"的状态,直到客户端确认消息
channel.basicQos(1); // accept only one unack-ed message at a time (see below) final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { //确认收到消息 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
消息返回时假设确认消息丢失了,那么消息服务会重发消息。注意,若是你设置了autoAck= false
,但又没应答channel.baskAck
也没有应答channel.baskNack
,那么会致使很是严重的错误:消息队列会被堵塞住,因此,不管如何都必须应答
消息监听接受消息并处理,假设抛异常了,第一阶段事物已经完成,若是要配置回滚则过于麻烦,即便作事务补偿也可能事务补偿失效的状况,因此这里能够作一个重复执行,好比guava
的retry
,设置一个指数时间来循环执行,若是n次后依然失败,发邮件、短信,用人肉来兜底。
《世界性难题...》
一文中对分布式事务的几种实现方式进行了形象概括
你天天上班,要通过一条10千米的只有两条车道的马路到达公司。这条路很堵,常常须要两三个小时,上班时间没有保证,这是2PC的问题-慢。
选择一条很绕,长30千米但不多堵车的路,这是选b。上班时间有保证,可是必须早起,付出足够的时间和汽油。这是柔性事务的问题,必须用具体业务来回滚,很难模块化
选择一条有点绕,长20千米的山路,路不平,只有suv能够走,这是事务消息最终一致性问题。引入了新的消息中间件,须要额外的开发成本。但我司开发的CoolMQ已经对组件进行了封装,只须要发送,接受,就能知足事务的要求。目前还有该方案的专题讲座,你们能够根据本身的须要选用。
最后是GTS
,GTS
修了一条拥有4条车道的高架桥,没有绕路,仍是10千米。不堵车,对事务来讲是高性能;不绕路,对事务来讲是简单易用,对业务无侵入,不用为事务而重构;没有车型限制,对事务来讲是没有功能限制,提供强一致事务。在没有高架桥的时代,高架桥出现对交通来讲就是一个颠覆性创新,不少之前看来无解的问题就迎刃而解了,一样的,GTS但愿经过创新改变数据一致性处理的行业现状。但遗憾的是并未开源
,并且须要结合阿里云服务
来使用。