EQueue - 一个C#写的开源分布式消息队列的整体介绍

前言

本文想介绍一下前段时间在写enode时,顺便实现的一个分布式消息队列equeue。这个消息队列的思想不是我想出来的,而是经过学习阿里的rocketmq后,本身用c#实现了一个轻量级的简单版本。一方面能够经过写这个队列让本身更深刻的掌握消息队列的一些常见问题;另外一方面也能够用来和enode集成,为enode中的command和domain event的消息传递提供支持。目前在.net平台,比较好用的消息队列,最多见的是微软的MSMQ了吧,还有像rabbitmq也有.net的client端。这些消息队列都很强大和成熟。但当我学习了kafka以及阿里的rocketmq(早期版本叫metaq,自metaq 3.0后更名为rocketmq)后,以为rocketmq的设计思想深深吸引了我,由于我不只能理解其思想,还有其完整的源代码能够学习。可是rocketmq是java写的,且目前尚未.net的client端,因此不能直接使用(有兴趣的朋友能够为其写一个.net的client端),因此在学习了rocketmq的设计文档以及大部分代码后,决定本身用c#写一个出来。html

项目开源地址:https://github.com/tangxuehua/equeue,项目中包含了队列的所有源代码以及如何使用的示例。也能够在enode项目中看到如何使用。java

EQueue消息队列中的专业术语

Topic

一个topic就是一个主题。一个系统中,咱们能够对消息划分为一些topic,这样咱们就能经过topic,将消息发送到不一样的queue。node

Queue

一个topic下,咱们能够设置多个queue,每一个queue就是咱们平时所说的消息队列;由于queue是彻底从属于某个特定的topic的,因此当咱们要发送消息时,老是要指定该消息所属的topic是什么。而后equeue就能知道该topic下有几个queue了。可是到底发送到哪一个queue呢?好比一个topic下有4个queue,那对于这个topic下的消息,发送时,到底该发送到哪一个queue呢?那一定有个消息被路由的过程。目前equeue的作法是在发送一个消息时,须要用户指定这个消息对应的topic以及一个用来路由的一个object类型的参数。equeue会根据topic获得全部的queue,而后根据该object参数经过hash code而后取模queue的个数最后获得要发送的queue的编号,从而知道该发送到哪一个queue。这个路由消息的过程是在发送消息的这一方作的,也就是下面要说的producer。之因此不在消息服务器上作是由于这样可让用户本身决定该如何路由消息,具备更大的灵活性。git

Producer

就是消息队列的生产者。咱们知道,消息队列的本质就是实现了publish-subscribe的模式,即生产者-消费者模式。生产者生产消息,消费者消费消息。因此这里的Producer就是用来生产和发送消息的。github

Consumer

就是消息队列的消费者,一个消息能够有多个消费者。算法

Consumer Group

消费者分组,这可能对你们来讲是一个新概念。之因此要搞出一个消费者分组,是为了实现下面要说的集群消费。一个消费者分组中包含了一些消费者,若是这些消费者是要集群消费,那这些消费者会平均消费该分组中的消息。sql

Broker

equeue中的broker负责消息的中转,即接收producer发送过来的消息,而后持久化消息到磁盘,而后接收consumer发送过来的拉取消息的请求,而后根据请求拉取相应的消息给consumer。因此,broker能够理解为消息队列服务器,提供消息的接收、存储、拉取服务。可见,broker对于equeue来讲是核心,它绝对不能挂,一旦挂了,那producer,consumer就没法实现publish-subscribe了。c#

集群消费

集群消费是指,一个consumer group下的consumer,平均消费topic下的queue。具体如何平都可以看一下下面的架构图,这里先用文字简单描述一下。假如一个topic下有4个queue,而后当前有一个consumer group,该分组下有4个consumer,那每一个consumer就被分配到该topic下的一个queue,这样就达到了平均消费topic下的queue的目的。若是consumer group下只有两个consumer,那每一个consumer就消费2个queue。若是有3个consumer,则第一个消费2个queue,后面两个每一个消费一个queue,从而达到尽可能平均消费。因此,能够看出,咱们应该尽可能让consumer group下的consumer的数目和topic的queue的数目一致或成倍数关系。这样每一个consumer消费的queue的数量老是同样的,这样每一个consumer服务器的压力才会差很少。当前前提是这个topic下的每一个queue里的消息的数量老是差很少多的。这点咱们能够对消息根据某个用户本身定义的key来进行hash路由来保证。缓存

广播消费

广播消费是指一个consumer只要订阅了某个topic的消息,那它就会收到该topic下的全部queue里的消息,而无论这个consumer的group是什么。因此对于广播消费来讲,consumer group没什么实际意义。consumer能够在实例化时,咱们能够指定是集群消费仍是广播消费。安全

消费进度(offset)

消费进度是指,当一个consumer group里的consumer在消费某个queue里的消息时,equeue是经过记录消费位置(offset)来知道当前消费到哪里了。以便该consumer重启后继续从该位置开始消费。好比一个topic有4个queue,一个consumer group有4个consumer,则每一个consumer分配到一个queue,而后每一个consumer分别消费本身的queue里的消息。equeue会分别记录每一个consumer对其queue的消费进度,从而保证每一个consumer重启后知道下次从哪里开始继续消费。实际上,也许下次重启后不是由该consumer消费该queue了,而是由group里的其余consumer消费了,这样也不要紧,由于咱们已经记录了这个queue的消费位置了。因此能够看出,消费位置和consumer其实无关,消费位置彻底是queue的一个属性,用来记录当前被消费到哪里了。另一点很重要的是,一个topic能够被多个consumer group里的consumer订阅。不一样consumer group里的consumer即使是消费同一个topic下的同一个queue,那消费进度也是分开存储的。也就是说,不一样的consumer group内的consumer的消费彻底隔离,彼此不受影响。还有一点就是,对于集群消费和广播消费,消费进度持久化的地方是不一样的,集群消费的消费进度是放在broker,也就是消息队列服务器上的,而广播消费的消费进度是存储在consumer本地磁盘上的。之因此这样设计是由于,对于集群消费,因为一个queue的消费者可能会更换,由于consumer group下的consumer数量可能会增长或减小,而后就会从新计算每一个consumer该消费的queue是哪些,这个能理解的把?因此,当出现一个queue的consumer变更的时候,新的consumer如何知道该从哪里开始消费这个queue呢?若是这个queue的消费进度是存储在前一个consumer服务器上的,那就很难拿到这个消费进度了,由于有可能那个服务器已经挂了,或者下架了,都有可能。而由于broker对于全部的consumer老是在服务的,因此,在集群消费的状况下,被订阅的topic的queue的消费位置是存储在broker上的,存储的时候按照不一样的consumer group作隔离,以确保不一样的consumer group下的consumer的消费进度互补影响。而后,对于广播消费,因为不会出现一个queue的consumer会变更的状况,因此咱们不必让broker来保存消费位置,因此是保存在consumer本身的服务器上。

EQueue是什么?

经过上图,咱们能直观的理解equeue。这个图是从rocketmq的设计文档中拿来的,呵呵。因为equeue的设计思想彻底和rocketmq一致,因此我就拿过来用了。每一个producer能够向某个topic发消息,发送的时候根据某种路由策略(producer可自定义)将消息发送到某个特定的queue。而后consumer能够消费特定topic下的queue里的消息。上图中,TOPIC_A有两个消费者,这两个消费者是在一个group里,因此应该平均消费TOPIC_A下的queue但因为有三个queue,因此第一个consumer分到了2个queue,第二个consumer分到了1个。对于TOPIC_B,因为只有一个消费者,那TOPIC_B下的全部queue都由它消费。全部的topic信息、queue信息、还有消息自己,都存储在broker服务器上。这点上图中没有体现出来。上图主要关注producer,consumer,topic,queue这四个东西之间的关系,并不关注物理服务器的部署架构。

关键问题的思考

1.Producer,Broker,Consumer三者之间如何通讯

因为是用c#实现,且由于通常是在局域网内部署,为了实现高性能通讯,咱们能够利用异步socket来通讯。.net自己提供了很好的异步socket通讯的支持;咱们也能够用zeromq来实现高性能的socket通讯。原本想直接使用zeromq来实现通讯模块就行了,但后来本身学习了一下.net自带的socket通讯相关知识,发现也不难,因此就本身实现了一个,呵呵。本身实现的好处是我能够本身定义消息的协议,目前这部分实现代码在ecommon基础类库中,是一个独立的可服用的与业务场景无关的基础类库。有兴趣的能够去下载下来看看代码。通过了本身的一些性能测试,发现通讯模块的性能仍是不错的。一台broker,四台producer同时向这个broker发送消息,每秒能发送的消息4W没有问题,更多的producer还没测试。

2.消息如何持久化

消息持久化方面主要考虑的是性能问题,还有就是消息如何快速的读取。

1. 首先,一台broker上的消息不须要一直保存在该broker服务器上,由于这些消息总会被消费掉。根据阿里rocketmq的设计,默认会1天删除一次已经被消费过的消息。因此,咱们能够理解,broker上的消息应该不会无限制增加,由于会被按期删除。因此没必要考虑一台broker上消息放不下的问题。

2. 如何快速的持久化消息?通常来讲,我以为有两种方式:1)顺序写磁盘文件;2)用现成的key,value的nosql产品来存储;rocketmq目前用的是本身写文件的方式,这种方式的难点是写文件比较复杂,由于全部消息都是顺序append到文件末尾,虽然性能很是高,但复杂度也很高;好比全部消息不能全写在一个文件里,一个文件到达必定大小后须要拆分,一旦拆分就会产生不少问题,呵呵。拆分后如何读取也是比较复杂的问题。还有因为是顺序写入文件的,那咱们还须要把每个消息在文件中的起始位置和长度须要记录下来,这样consumer在消费消息时,才能根据offset从文件中拿到该消息。总之须要考虑的问题不少。若是是用nosql来持久化消息,那能够省去咱们写文件时遇到的各类问题,咱们只须要关心如何把消息的key和该消息在queue中的offset对应起来便可。另一点疑问是,queue里的信息要持久化吗?先要想清楚queue里放的是什么东西。当broker接收到一个消息后,首先确定是要先持久化,完成后须要把消息放入queue里。但因为内存颇有限,咱们不可能把这个消息直接放入queue里,咱们其实要放的只须要时该消息在nosql里的key便可,或者若是是用文件来持久化,那放的是该消息在文件中的偏移量offset,即存储在文件的那个位置(好比是哪一个行号)。因此,实际上,queue只是一个消息的索引。那有必要持久化queue吗?能够持久化,这样毕竟在broker重启的时候,恢复queue的时间能够缩短。那须要和持久化消息同步持久化吗?显然不须要,咱们能够异步定时持久化每一个queue,而后恢复queue的时候,能够先从持久化的部分恢复,而后再把剩下的部分经过持久化的消息来补充以达到queue由于异步持久化而慢的部分能够追平。因此,通过上面的分析,消息自己都是放在nosql中,queue所有在内存中。

那消息如何持久化呢?我以为最好的办法是让每一个消息有一个全局的顺序号,一旦消息被写入nosql后,该消息的全局顺序号就肯定了,而后咱们在更新对应的queue的信息时,把该消息的全局顺序号传给queue,这样queue就能把queue本身对该消息的本地顺序号和该消息的全局顺序号创建映射关系。相关代码以下:

public MessageStoreResult StoreMessage(Message message, int queueId)
{
    var queues = GetQueues(message.Topic);
    var queueCount = queues.Count;
    if (queueId >= queueCount || queueId < 0)
    {
        throw new InvalidQueueIdException(message.Topic, queueCount, queueId);
    }
    var queue = queues[queueId];
    var queueOffset = queue.IncrementCurrentOffset();
    var storeResult = _messageStore.StoreMessage(message, queue.QueueId, queueOffset);
    queue.SetMessageOffset(queueOffset, storeResult.MessageOffset);
    return storeResult;
}

没什么比代码更能说明问题了,呵呵。上的代码的思路是,接收一个消息对象和一个queueId,queueId表示当前消息要放到第几个queue里。而后内部逻辑是,先获取该消息的topic的全部queue,因为queue和topic都在内存,因此这里没性能问题。而后检查一下当前传递进来的queueId是否合法。若是合法,那就定位到该queue,而后经过IncrementCurrentOffset方法,将queue的内部序号加1并返回,而后持久化消息,持久化的时候把queueId以及queueOffset一块儿持久化,完成后返回一个消息的全局序列号。因为messageStore内部会把消息内容、queueId、queueOffset,以及消息的全局顺序号一块儿做为一个总体保存到nosql中,key就是消息的全局序列号,value就是前面说的总体(被序列化为二进制)。而后,在调用queue的SetMessageOffset方法,把queueOffset和message的全局offset创建映射关系便可。最后返回一个结果。messageStore.StoreMessage的内存实现大体以下:

public MessageStoreResult StoreMessage(Message message, int queueId, long queueOffset)
{
    var offset = GetNextOffset();
    _queueCurrentOffsetDict[offset] = new QueueMessage(message.Topic, message.Body, offset, queueId, queueOffset, DateTime.Now);
    return new MessageStoreResult(offset, queueId, queueOffset);
}

GetNextOffset就是获取下一个全局的消息序列号,QueueMessage就是上面所说的“总体”,由于是内存实现,因此就用了一个ConcurrentDictionary来保存一下queueMessage对象。若是是用nosql来实现messageStore,则这里须要写入nosql,key就是消息的全局序列号,value就是queueMessage的二进制序列化数据。经过上面的分析咱们能够知道咱们会将消息的全局序列号+queueId+queueOffset一块儿总体做为一条记录持久化起来。这样作有两个很是好的特性:1)实现了消息持久化和消息在queue中的位置的持久化的原子事务;2)咱们老是能够根据这些持久化的queueMessage还原出全部的queue的信息,由于queueMessage里包含了消息和消息在queue的中的位置信息;

基于这样的消息存储,当某个consumer要消费某个位置的消息时,咱们能够经过先经过queueId找到queue,而后经过消息在queueOffset(由consumer传递过来的)获取消息的全局offset,而后根据该全局的offset做为key从nosql拿到消息。实际上如今的equeue是批量拉取消息的,也就是一次socket请求不是拉一个消息,而是拉一批,默认是32个消息。这样consumer能够用更少的网络请求拿到更多的消息,能够加快消息消费的速度。

3.Producer发送消息时的消息路由的细节

producer在发送消息时,如何知道当前topic下有多少个queue呢?每次发送消息时都要去broker上查一下吗?显然不行,这样发送消息的性能就上不去了。那怎么办呢?就是异步,呵呵。producer能够定时向broker发送请求,获取topic下的queue数量,而后保存起来。这样每次producer在发送消息时,就只要从本地缓存里拿便可。由于broker上topic的queue的数量通常不会变化,因此这样的缓存颇有意义。那还有一个问题,当前producer第一次对某个topic发送消息时,queue哪里来呢?由于定时线程不知道要向broker拿哪一个topic下的queue数量,由于此时producer端尚未一个topic呢,由于一个消息都还没发送过。那就是须要判断一下,若是当前topic没有queue的count信息,则直接从broker上获取queue的count信息。而后再缓存起来,在发送当前消息。而后第二次发送时,由于缓存里已经有了该消息,因此就没必要再从broker拿了,且后续定时线程也会自动去更新该topic下的queue的count了。好,producer有了topic的queue的count,那用户在发送消息时,框架就能把这个topic的queueCount传递给用户,而后用户就能根据本身的须要将消息路由到第几个queue了。

4.consumer负载均衡如何实现

consumer负载均衡的意思是指,在消费者集群消费的状况下,如何让同一个consumer group里的消费者平均消费同一个topic下的queue。因此这个负载均衡本质上是一个将queue平均分配给consumer的过程。那么怎么实现呢?经过上面负载均衡的定义,咱们只要,要作负载均衡,必需要肯定consumer group和topic;而后拿到consumer group下的全部consumer,以及topic下的全部queue;而后对于当前的consumer,就能计算出来当前consumer应该被分配到哪些queue了。咱们能够经过以下的函数来获得当前的consumer应该被分配到哪几个queue。

public class AverageAllocateMessageQueueStrategy : IAllocateMessageQueueStrategy
{
    public IEnumerable<MessageQueue> Allocate(string currentConsumerId, IList<MessageQueue> totalMessageQueues, IList<string> totalConsumerIds)
    {
        var result = new List<MessageQueue>();

        if (!totalConsumerIds.Contains(currentConsumerId))
        {
            return result;
        }

        var index = totalConsumerIds.IndexOf(currentConsumerId);
        var totalMessageQueueCount = totalMessageQueues.Count;
        var totalConsumerCount = totalConsumerIds.Count;
        var mod = totalMessageQueues.Count() % totalConsumerCount;
        var size = mod > 0 && index < mod ? totalMessageQueueCount / totalConsumerCount + 1 : totalMessageQueueCount / totalConsumerCount;
        var averageSize = totalMessageQueueCount <= totalConsumerCount ? 1 : size;
        var startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        var range = Math.Min(averageSize, totalMessageQueueCount - startIndex);

        for (var i = 0; i < range; i++)
        {
            result.Add(totalMessageQueues[(startIndex + i) % totalMessageQueueCount]);
        }

        return result;
    }
}

函数里的实现就很少分析了。这个函数的目的就是根据给定的输入,返回当前consumer该分配到的queue。分配的原则就是平均分配。好了,有了这个函数,咱们就能很方便的实现负载均衡了。咱们能够对每个正在运行的consumer内部开一个定时job,该job每隔一段时间进行一次负载均衡,也就是执行一次上面的函数,获得当前consumer该绑定的最新queue。由于每一个consumer都有一个groupName属性,用于表示当前consumer属于哪一个group。因此,咱们就能够在负载均衡时到broker获取当前group下的全部consumer;另外一方面,由于每一个consumer都知道它本身订阅了哪些topic,因此有了topic信息,就能获取topic下的全部queue的信息了,有了这两样信息,每一个consumer就能本身作负载均衡了。先看一下下面的代码:

_scheduleService.ScheduleTask(Rebalance, Setting.RebalanceInterval, Setting.RebalanceInterval);
_scheduleService.ScheduleTask(UpdateAllTopicQueues, Setting.UpdateTopicQueueCountInterval, Setting.UpdateTopicQueueCountInterval);
_scheduleService.ScheduleTask(SendHeartbeat, Setting.HeartbeatBrokerInterval, Setting.HeartbeatBrokerInterval);

每一个consumer内部都会启动三个定时的task,第一个task表示要定时作一次负载均衡;第二个task表示要定时更新当前consumer订阅的全部topic的queueCount信息,并把最新的queueCount信息都保存在本地;第三个task表示当前consumer会向broker定时发送心跳,这样broker就能经过心跳知道某个consumer是否还活着,broker上维护了全部的consumer信息。一旦有新增或者发现没有及时发送心跳过来的consumer,就会认为有新增或者死掉的consumer。由于broker上维护了全部的consumer信息,因此他就能提供查询服务,好比根据某个consumer group查询该group下的consumer。

经过这三个定时任务,就能完成消费者的负载均衡了。先看一下Rebalance方法:

private void Rebalance()
{
    foreach (var subscriptionTopic in _subscriptionTopics)
    {
        try
        {
            RebalanceClustering(subscriptionTopic);
        }
        catch (Exception ex)
        {
            _logger.Error(string.Format("[{0}]: rebalanceClustering for topic [{1}] has exception", Id, subscriptionTopic), ex);
        }
    }
}

代码很简单,就是对每一个订阅的topic作负载均衡处理。再看一下RebalanceClustering方法:

View Code

上面的代码很少分析了,就是先根据consumer group和topic获取全部的consumer,而后对consumer作排序处理。之因此要作排序处理是为了确保负载均衡时对已有的分配状况尽可能不发生改变。接下来就是从本地获取topic下的全部queue,一样根据queueId作一下排序。而后就是调用上面的分配算法计算出当前consumer应该分配到哪些queue。最后调用UpdatePullRequestDict方法,用来对新增或删除的queue作处理。对于新增的queue,要建立一个独立的worker线程,开始从broker拉取消息;对于删除的queue,要中止其对应的work,中止拉取消息。

经过上面的介绍和分析,咱们你们知道了equeue是如何实现消费者的负载均衡的。咱们能够看出,由于每一个topic下的queue的更新是异步的定时的,且负载均衡自己也是定时的,且broker上维护的consumer的信息也不是事实的,由于每一个consumer发送心跳到broker不是实时发送的,而是好比每隔5s发送一次。全部这些由于都是异步的设计,因此可能会致使在负载均衡的过程当中,同一个queue可能会被两个消费者同时消费。这个就是所谓的,咱们只能作到一个消息至少被消费一次,但equeue层面作不到一个消息只会被消费一次。实际上像rocketmq这种也是这样的思路,放弃一个消息只会被消费一次的实现(由于代价太大,且过于复杂,实际上对于分布式的环境,不太可能作到一个消息只会被消费一次),而是采用确保一个消息至少会被消费一次(即at least once).因此使用equeue,应用方要本身作好对每一个消息的幂等处理。

5.如何实现实时消息推送

消息的实时推送,通常有两种作法:推模式(push)和拉模式(pull)。push的方式是指broker主动对全部订阅了该topic的消费者推送消息;pull的方式是指消费者主动到broker上拉取消息;对于推模式,最大的好处就是实时,由于一有新的消息,就会当即推送给消费者。可是有一个缺点就是若是消费者来不及消费,它也会给消费者推消息,这样就会致使消费者端的消息会堵塞。而经过拉的方式,有两种实现:1)轮训的方式拉,好比每隔5s轮训一下是否有新消息,这种方式的缺点是消息不实时,可是消费进度彻底由消费者本身把控了;2)开长链接的方式来拉,就是不轮训,消费者和broker之间一直保持的链接通道,而后broker一有新消息,就会利用这个通道把消息发送给消费者。

equeue中目前采用的是经过长链接拉取消息的方式。长链接经过socket长链接实现。可是虽然叫长链接,也不是一直不断开,而是也会设计一个超时的限制,好比一个长链接最大不超过15s,超过15s,则broker发送回复给consumer,告诉consumer当前没有新消息;而后consumer接受到这个回复后,就知道要继续发起下一个长链接来拉取。而后假如在这15s中以内,broker上有新消息了,则broker就能当即主动利用这个长链接通知相应的消费者,把消息传给消费者。因此,能够看出,broker上在处理消费者的拉取消息的请求时,若是当前没有新消息,则会hold住这个socket链接,最多hold 15s,超过15s,则发送返回信息,告诉消费者当前无消息,而后消费者再次发送pull message request过来。经过这样的基于长链接的拉取模式,咱们能够实现两个好处:1)消息实时推送;2)由消费者控制消息消费进度;

另外,equeue里还实现了消费者自身的自动限流功能。就是假如当前broker上消息不少,即生产者生产消息的速度大于消费者消费消息的速度,那broker上就会有消息被堆积。那此时消费者在拉取消息时,老是会有新消息拉取到,可是消费者又来不及处理这么多消息。因此equeue框架内置了一个限流(流控,流量控制)的设计,就是能够容许用于配制一个消费者端堆积的消息的上限,好比3000,超过这个数目(可配置),则equeue会让消费者以慢一点的频率拉取消息。好比延迟个多少毫秒(延迟时间可配置)再拉取。这样就简单的实现了流控的目的。

6.如何处理消息消费失败的状况

做为一个消息队列,消费者老是可能会在消费消息时抛出异常,在equeue中这种状况就是消息消费失败的状况。经过上面的消费进度的介绍,你们知道了每一个queue对某个特定的consumer group,都有一个惟一的消费进度。实际上,消息被拉取到consumer本地后,可能会被以两种方式消费,一种是并行消费,一种是线性消费。

并行消费的意思是,假如当前一次性拉取过来32个消息,那equeue会经过启动task(即开多线程)的方式并行消费每一个消息;

线性消费的意思是,消息是在一个独立的单线程中顺序消费,消费顺序和拉取过来的顺序相同。

对于线性消费,假如前一个消息消费的时候失败了,也就是抛异常了,那该怎么办呢?可能想到的办法是重试个3次,可是要是重试后仍是失败呢?总不能由于这个消息而致使后面的消息没法把消费吧?呵呵!对于这种状况,先说一下rocketmq里的处理方式吧:它的作法是,当遇到消费失败的状况,没有立马重试,而是直接把这个消息发送到broker上的某个重试队列,发送成功后,就能够往下消费下一个消息了。由于一旦发送到重试队列,那意味着这个消息就最后老是会被消费了,由于该消息不会丢了。可是要是发送到broker的重试队列也不成功呢?这个?!其实这种状况不大应该出现,若是出现,那基本就是broker挂了,呵呵。

rocketmq中,对于这种状况,那会把这个失败的消息放入本地内存队列,慢慢消费它。而后继续日后消费后面的消息。如今你必定很关心queue的offset是如何更新的?这里涉及到一个滑动门的概念。当一批消息从broker拉取到消费者本地后,并非立刻消费的,而是先放入一个本地的SortedDictionary,key就是消息在queue里的位置,value就是消息自己。由于是一个排序的dictionary,因此key最小的消息意味着是最前面的消息,最大的消息就是最后面的消息。而后无论是并行消费仍是线性消费,只要某个消息被消费了,那就从这个SortedDictionary里移除掉。每次被移除一个消息时,老是会返回当前这个SortedDictionary里的最小的key,而后咱们就能判断这个key是否和上次比是否前移了,若是是,则更新queue的这个最新的offset。由于每次移除一个消息的时候,老是返回当前SortedDictionary里的最小的key,因此,假如当前offset是3,而后offset为4的这个消息一直消费失败,因此不会被移除,可是offset为5,6,7,8的这些消息虽然都消费成功了,可是只要offset为4的这个消息没有被移除,那最小的key就不会往前移动。这个就是所谓的滑动门的概念了。就比如是在铁轨上一辆在跑的动车,offset的往前移动就比如是动车在不断往前移动。由于咱们但愿offset老是会不断往前移动,因此不但愿前面的某个消费失败的消息让这个滑动门中止移动(即咱们老是但愿这个最小的key能不断变大),因此咱们会千方百计让消费失败的消息能不阻碍滑动门的往前移动。因此才把消费失败的消息放入重试队列。

另一点须要注意一下:并非每次成功消费完一个消息,就会立马告诉broker更新offset,由于这样那性能确定很低,broker也会忙死,更好的办法是先只是在本地内存更新queue的offset,而后定时好比5s一次,将最新的offset更新到broker。因此,由于这个异步的存在,一样也会致使某个消息被重复消费的可能性,由于broker上的offset确定比实际的消费进度要慢,有5s的时间差。因此,再次强调,应用方必需要处理好对消息的幂等处理!好比enode框架中,对每一个command消息,框架内部都作了command的幂等处理。因此使用enode框架的应用,自身无需对command作幂等处理方面的考虑。

上面提到了并行消费和线性消费,其实对于offset的更新来讲是同样的,由于并行消费无非是多线程同时从SortedDictionary中移除消费成功的消息,而单线程只是单个线程去移除SortedDictionary中的消息。因此咱们要经过锁的机制,保证对SortedDictionary的操做是线程安全的。目前用了ReaderWriterLockSlim来实现对方法调用的线层安全。有兴趣的朋友能够去看一下代码。

最后,也是重点,呵呵。equeue目前尚未实现将失败的消息发回到broker的重试队列。这个功能之后会考虑加进去。

7.如何解决Broker的单点问题

这个问题比较复杂,目前equeue不支持broker的master-salve或master-master,而是单点的。我以为一个成熟的消息队列,为了确保在一个broker挂了的时候,要尽可能能确保有其余broker能够接替它,这样才能让消息队列服务器的可靠性。可是这个问题实在太复杂。rocketmq目前实现的也只是master-slave的方式。也就是只要主的master挂了,那producer就没法向broker发送消息了,由于slave的broker是只读的,不能直接接受新消息,slave的broker只能容许被consumer拉取消息。

这个问题,要讨论清楚,须要不少分布式方面的知识。因为篇幅的缘由,这里就不作讨论了,实际上我本身也搞不清楚到底该如何设计。但愿大牛们多多指点,如何实现broker的高可用哈!

相关文章
相关标签/搜索