消息队列,既然是队列就能保证消息在进入队列,以及出队列的时候保证消息的有序性,显然这是在消息的生产端(Producer),可是每每在生产环境中有多个消息的消费端(Consumer),尽管消费端在拉取消息时是有序的,但各个消息因为网络等方面缘由没法保证在各个消费端中处理时有序。程序员
前后两次修改了商品信息,消息A和消息B前后同步写入MySQL,接着异步写入消息队列中发送消息,此时消息队列生产端(Producer)按时序前后发出了A和B两条消息(消息A先发出,消息B后发出)。按业务逻辑,商品信息的最终状态须要以消息A和消息B综合为准。数据库
看似一个比较常见的同步写数据库,异步发送消息的场景,但实际上须要保证消息的有序消费。缓存
可见,你没法保证消息中包含什么信息,此时必须保证消息的有序消费。安全
下面经过伪代码的方式描述:网络
生产端伪代码异步
insertWare(ware); #插入数据到数据库,一般在插入数据库时咱们只会update修改的字段,而不会全量插入分布式
ware = selectWareById(ware.getId); #获取商品的全量信息(此时是最新的),用于将它放入到消息队列中fetch
syncMq(ware); #异步发送mq消息A线程
消费端伪代码code
ware = fetchWare(); #获取消息
if (isLasted(ware)) #经过商品的修改时间戳判断是不是最新的修改
TODO #执行下一步业务逻辑
else
return #丢弃该消息
重点在于消费端如何判断该消息是不是最新的修改也就是isLasted
方法。
isLasted方法
Long modified = getCacheById(ware.getId); #获取缓存中该条商品的最新修改时间
If (ware.getModified > modified) { #若是消息中商品修改时间大于缓存中的时间,说明是最新操做
setCacheById(ware); #将该条消息的商品修改时间戳写入到缓存中
return true;
} else #若是消息中的商品修改时间小于缓存中的时间,说明该条消息属于“历史操做”,不对其更新 return false;
以上就是经过伪代码的方式,描述如何经过业务手段保证消息有序消费,重点在于全量发送信息和缓存时间戳。在其中还有一些技术实现细节。
例如:消费端消费消息B,执行到获取时间戳缓存以后,并在从新设置新的缓存以前,此时另外一个消费端刚好也正在消费B它也正执行到获取时间戳缓存,因为消息A此时并无更新缓存,消息A拿到的缓存仍然是旧的缓存,这时就会存在两个消费端都认为本身所消费的消息时最新的,形成该丢弃的消息没丢。
显然,这是分布式线程安全问题,分布式锁一般使用Redis或者ZooKeeper,加锁后的执行时序以下图所示。
这是从业务角度保证消息在消费端有序消费。经过在消息发送端全量发送消息以及在消息消费端缓存时间戳就能够保证消息的有序消费。
在上述场景中是先同步写入MySQL,再获取商品全量数据,接着再异步发送消息。这一系列的步骤能够经过接MySQL的binlog实现,在同步写入MySQL后,MySQL发送binlog变动,经过阿里巴巴Canal中间件接收MySQL的binlog变动再发送消息到消息队列。