利用本地消息表和MQ解决消息可靠性

本地消息表

ebay方案:https://queue.acm.org/detail.cfm?id=1394128数据库

为解决producer端消息发送和本地事务执行原子性问题,将须要分布式处理的任务经过消息日志方式存储到一个地方,在本地事务完成以前,这个消息对于消费者是不可见的,本地事务执行成功以后,消费者才会看到这个消息并进行消费。服务器

消息日志能够存储到本地文本,数据库或消息队列,经过业务规则自动或者人工方式发起重试。异步

人工重试多用于支付场景,经过对帐系统对过后问题进行处理。 分布式

  • 预发送消息到mq,消费者是看不到此消息的,所以不会进行消费
  • 执行本地事务,好比操做数据库,依赖本地事务
  • 若是本地事务执行成功,则进行mq消息确认。若是失败,则回滚mq消息

对于本地消息队列来讲,把大事务转变成小事务,举个例子:线程

  • 当扣钱时,须要在扣钱当服务器上增长一个本地消息表,须要把扣钱和减去的库写入存放到本地消息表,依靠数据库本地事务保证一致性
  • 定时任务轮训本地消息表,把没有发送的消息发送给商品目标服务器,让其去减库存,达到商品服务的请求先写入本地消息表,进行扣减,扣减成功后,更新消息表状态
  • 商品服务经过定时任务扫描消息表,扣减库存成功后修改本地消息表状态
  • 若是定时任务扫描到没有执行成功的消息,则进行重发,商品服务接受到消息后判断消息是否重复,保证幂等性

本地事务实现设计

能够将消息放到一个地方存起来,好比在数据库中创建一个表,将消息放入这个表,称之为本地事务,这个表中有个state字段表示消息状态,在预发送消息阶段,标记成unkonwn状态。3d

以后根据本地事务执行结果,修改state,执行成功设置为local_commit,执行失败执行local_rollback。代理

同时能够创建一个异步现场执行兜底,不断从这个表中查询状态为local_commit的消息,将其发送到mq中。日志

  • 若是发送mq成功,整个事务能够任务执行结束,修改状态为global_commit,接下来消费者进行消息消费。
  • 若是发送mq失败,能够进行重试,直到成功,若是先限制重试次数,能够在表中增长retry_count字段每次重试就+1,当超太重试阈值后,就再也不发送,能够指定一个消息超时时间,超过期间阈值后,就再也不发送。对于失败的消息,将其标记为message_error,还能够增长一个cause字段,表示由于什么缘由致使消息发送失败。

若是本地消息状态修改失败,那么一个消息可能一直处于unkonwn状态,而异步现成只会发送那些local_commit的消息到mq中,这样一些消息会一直被忽略,就产生了消息丢失。code

通常有三种解决方案:

一:扩大事务边界

将预发送消息,执行本地事务,修改本地事务表状态三个操做,合并到一个事务里面。第一步预发送消息以前就开启事务,在第三步执行结束以后提交或回滚事务,,这样经过事务保证了本地消息表的消息记录,和操做产生记录老是成功或者失败。

二:合并事务状态

能够合并事务状态,直接和正常的数据库操做合并到一个事务中,写入到数据库直接就是local_commit,以后异步现成发送逻辑不变。

三:对prepared状态消息进行检查

简单的操做能够直接执行一次DB事务就能够了,若是复杂的一些场景,好比A业务发起方除了须要操做本地数据库,还须要进行RPC调用查询其余业务B,以获取一些mq消息须要的信息。这样可能A须要先将消息保存下来,等到B能够提供消息以后再发送。

这种状况的策略是,A和B之间约定一个能够异步处理的时间阈值,让异步线程除了发送local_commit状态的消息,还须要对prepared状态消息进行检查。依靠设置的时间阈值,在过滤消息时,prepared消息对时间和当前时间必须知足必定的时间阈值,避免和新事务消息的prepared消息状态混淆。

依赖MQ事务

消息队列的事务实现相似于本地消息表,只不过是将实现放到了MQ内部。

流程以下:

  1. 第一阶段prepared消息,拿到消息地址
  2. 执行本地事务
  3. 经过第一阶段拿到地址去访问消息,并修改状态,消息接收者使用这个消息

若是消息确认操做失败,mq broker会定时扫描没有更新状态的消息,若是有消息没有获得确认,会向消息发送者发送消息,判断是否提交了。 若是消息消费超时了,须要一直重试,消息接收端须要保证幂等,若是消息消费失败,须要人工进行处理,由于几率较低,设计复杂的流程反而得不偿失。

消息队列通常有事务处理方案,能够解决producer发送消息和本地事务执行的原子性操做。 MQ的方案通常也是将消息找个地方存起来,RocketMQ将消息存放到内部主题中。为了支持事务,RocketMQ引入了Half Topic及Operation Topic两个内部队列来存储事务消息推动状态。

  • Half Topic对应队列中存放着prepare消息,就是预发送消息,消息不直接发送到topic,所以消费者对其不可见,实现暂存。
  • Opreation Topic对应的队列存放prepare message对应的commit/rollback消息,消息体中是prepare message对应的offset。

RocketMQ中事务消息发送流程以下:

事务生产者预发送消息

经过TransactionMQProducer发送事务消息,这个producer在一条普通的message上加一些数据,表示这个是一条预发送的事务消息。broker在发现这是一条事务消息的时候,将其放到half topic中。

执行本地事务

发送prepare消息以后,须要执行本地事务,须要实现RocketMQ提供的一个TransactionListener 接口方法完成。

  • executeLocalTransaction方法:用于执行本地事务,能够操做数据库或者干一些别的事情
  • checkLocalTransaction方法:用于检查事务状态

这两个方法返回一个表示本地事务消息的执行状态LocalTransactionState,事务生产者会将其上报给broker,状态以下:

public enum LocalTransactionState {
    COMMIT_MESSAGE,
    ROLLBACK_MESSAGE,
    UNKNOW,
}

本地事务状态处理

生产者拿到状态后上报broker,broker在处理时,会根据状态进行处理。

若是是commit/rollback状态: brokder会把收到的事务消息状态记录在内部的operation topci中,消息体中是prepare message对应在half topci中的offset。

  • 若是是rollback消息,broker将从half topic中删除该prepare消息不进行下发
  • 若是是commit消息,broker会把这个消息取出来,发送到原始的目标topci中,此时consumer端能够消费

上图能够观察到,一些异常状况下,可能上报事务消息状态失败,所以operate topci中没有记录,二者之间的差值通常就是unkown为确认中间状态的消息,须要进行特殊处理。

unknow状态消息

若是是unknow状态消息,说明存在不肯定的事务状态,broker须要主动询问客户端producer。

出现unknow状态通常因为如下缘由形成:

  • 若是本地事务支持过程当中,执行端挂掉,或者超时时会出现异常状态
  • 一些特殊场景,须要等待一段时间知足特定场景,才把消息交给消费者进行消费的,可能须要主动的返回unknow状态,属于有意为之

因为unknown中间状态的消息,不会提交到operation topic中,所以half topci和operation topic这两个内部主题中,服务端经过对比两个主题的差值来找到未被提交的超时任务,进行回查。 因此业务方须要提供一个方法让rocketmq来回调,TransactionListener 中的checkLocalTransaction 就是用于回查。rocketmq会把以前发送的消息看成参数入参,业务实现根据消息内容能够反查业务信息,来肯定状态。 broker主动轮训客户端producer事务状态,依赖于broker和producer端的双向通讯能力来完成的,broker会主动给客户端producer发请求。

Sage事务

Saga事务是将长事务拆分红多个本地短事务,有Saga协调器协调,若是正常结束则算完成,某个步骤失败,则根据相反顺序调用一次补偿操做。

总结

RocketMQ事务消息能够解决事务的一致性问题,事务发起方须要关注本地事务执行及实现回查接口进行事务状态判断。

RocketMQ事务消息处理的限制:

  • 事务消息没有延迟和批量支持,不能使用延迟消息特性和批量发送消息特性
  • 为避免屡次检查单个消息致使half topic消息积累,默认将单个消息的检查次数限制为15次
  • 经过transactionTimeout 配置检查事务消息的固定周期
  • 能够屡次检查和消费事务消息
  • 将事务消息交给目标topic可能会失败,rocketmq自己高可用机制确保高可用性,为保证事务消息不丢失或事务完整性能够采用同步双写机制
  • 事务消息生产者id不能和其余类型消息的生产者id共享,和其余类型消息不一样,事务消息容许后向查询,mq server按照其生产者id查询客户端

订阅数据库binlog

为避免对于业务的入侵,能够采用binlog实现可靠性发送:

  • 先把本地事务执行完成,本地事务每一个数据库更新操做都产生binlog event,event在本地事务成功后,才会产生
  • 经过binlog订阅组件,订阅数据库变动,订阅到binlog event,说明执行了本地事务信息,能够放心根据event解析相应信息,发送到mq便可

kafka的事务消息

kafka producer支持两种模式:幂等生产者和事务生产者。

  • 幂等生产者:将kafka交付语意从at least once增强到exactly once,生产者重试将不会引入重复
  • 事务生产者:容许应用程序以原子方式同时发送消息到多个主题和分区

kafka相似于数据库事务的原子性,能够在kafka以前加个代理,由代理暂存事务消息,条件知足后,再发送到目标topic供消费者消费。

相关文章
相关标签/搜索