ebay方案:https://queue.acm.org/detail.cfm?id=1394128数据库
为解决producer端消息发送和本地事务执行原子性问题,将须要分布式处理的任务经过消息日志方式存储到一个地方,在本地事务完成以前,这个消息对于消费者是不可见的,本地事务执行成功以后,消费者才会看到这个消息并进行消费。服务器
消息日志能够存储到本地文本,数据库或消息队列,经过业务规则自动或者人工方式发起重试。异步
人工重试多用于支付场景,经过对帐系统对过后问题进行处理。
分布式
对于本地消息队列来讲,把大事务转变成小事务,举个例子:线程
本地事务实现设计
能够将消息放到一个地方存起来,好比在数据库中创建一个表,将消息放入这个表,称之为本地事务,这个表中有个state字段表示消息状态,在预发送消息阶段,标记成unkonwn状态。3d
以后根据本地事务执行结果,修改state,执行成功设置为local_commit,执行失败执行local_rollback。代理
同时能够创建一个异步现场执行兜底,不断从这个表中查询状态为local_commit的消息,将其发送到mq中。日志
若是本地消息状态修改失败,那么一个消息可能一直处于unkonwn状态,而异步现成只会发送那些local_commit的消息到mq中,这样一些消息会一直被忽略,就产生了消息丢失。code
通常有三种解决方案:
一:扩大事务边界
将预发送消息,执行本地事务,修改本地事务表状态三个操做,合并到一个事务里面。第一步预发送消息以前就开启事务,在第三步执行结束以后提交或回滚事务,,这样经过事务保证了本地消息表的消息记录,和操做产生记录老是成功或者失败。
二:合并事务状态
能够合并事务状态,直接和正常的数据库操做合并到一个事务中,写入到数据库直接就是local_commit,以后异步现成发送逻辑不变。
三:对prepared状态消息进行检查
简单的操做能够直接执行一次DB事务就能够了,若是复杂的一些场景,好比A业务发起方除了须要操做本地数据库,还须要进行RPC调用查询其余业务B,以获取一些mq消息须要的信息。这样可能A须要先将消息保存下来,等到B能够提供消息以后再发送。
这种状况的策略是,A和B之间约定一个能够异步处理的时间阈值,让异步线程除了发送local_commit状态的消息,还须要对prepared状态消息进行检查。依靠设置的时间阈值,在过滤消息时,prepared消息对时间和当前时间必须知足必定的时间阈值,避免和新事务消息的prepared消息状态混淆。
消息队列的事务实现相似于本地消息表,只不过是将实现放到了MQ内部。
流程以下:
若是消息确认操做失败,mq broker会定时扫描没有更新状态的消息,若是有消息没有获得确认,会向消息发送者发送消息,判断是否提交了。 若是消息消费超时了,须要一直重试,消息接收端须要保证幂等,若是消息消费失败,须要人工进行处理,由于几率较低,设计复杂的流程反而得不偿失。
消息队列通常有事务处理方案,能够解决producer发送消息和本地事务执行的原子性操做。 MQ的方案通常也是将消息找个地方存起来,RocketMQ将消息存放到内部主题中。为了支持事务,RocketMQ引入了Half Topic及Operation Topic两个内部队列来存储事务消息推动状态。
RocketMQ中事务消息发送流程以下:
事务生产者预发送消息
经过TransactionMQProducer发送事务消息,这个producer在一条普通的message上加一些数据,表示这个是一条预发送的事务消息。broker在发现这是一条事务消息的时候,将其放到half topic中。
执行本地事务
发送prepare消息以后,须要执行本地事务,须要实现RocketMQ提供的一个TransactionListener 接口方法完成。
这两个方法返回一个表示本地事务消息的执行状态LocalTransactionState,事务生产者会将其上报给broker,状态以下:
public enum LocalTransactionState { COMMIT_MESSAGE, ROLLBACK_MESSAGE, UNKNOW, }
本地事务状态处理
生产者拿到状态后上报broker,broker在处理时,会根据状态进行处理。
若是是commit/rollback状态: brokder会把收到的事务消息状态记录在内部的operation topci中,消息体中是prepare message对应在half topci中的offset。
上图能够观察到,一些异常状况下,可能上报事务消息状态失败,所以operate topci中没有记录,二者之间的差值通常就是unkown为确认中间状态的消息,须要进行特殊处理。
unknow状态消息
若是是unknow状态消息,说明存在不肯定的事务状态,broker须要主动询问客户端producer。
出现unknow状态通常因为如下缘由形成:
因为unknown中间状态的消息,不会提交到operation topic中,所以half topci和operation topic这两个内部主题中,服务端经过对比两个主题的差值来找到未被提交的超时任务,进行回查。 因此业务方须要提供一个方法让rocketmq来回调,TransactionListener 中的checkLocalTransaction 就是用于回查。rocketmq会把以前发送的消息看成参数入参,业务实现根据消息内容能够反查业务信息,来肯定状态。 broker主动轮训客户端producer事务状态,依赖于broker和producer端的双向通讯能力来完成的,broker会主动给客户端producer发请求。
Saga事务是将长事务拆分红多个本地短事务,有Saga协调器协调,若是正常结束则算完成,某个步骤失败,则根据相反顺序调用一次补偿操做。
RocketMQ事务消息能够解决事务的一致性问题,事务发起方须要关注本地事务执行及实现回查接口进行事务状态判断。
RocketMQ事务消息处理的限制:
订阅数据库binlog
为避免对于业务的入侵,能够采用binlog实现可靠性发送:
kafka的事务消息
kafka producer支持两种模式:幂等生产者和事务生产者。
kafka相似于数据库事务的原子性,能够在kafka以前加个代理,由代理暂存事务消息,条件知足后,再发送到目标topic供消费者消费。