最近正在研究rocketmq,简单记录下设计的不一样数据库
互联网系统中Rpc、服务治理、消息中间件基本都是标配,消息中间件能解耦,削峰,高可用并能间接提供达到最终一致性服务器
消息中间件中,消息消费分为最多一次,至少一次和恰好一次,若是须要实现恰好一次,则系统设计难度增大,系统性能损失增长,权衡利弊,rocket实现的是最少一次,消费端可能会重复接收消息(ACK模式下,ACK消息可能丢失),由消费端幂等消费网络
为何不用zk,仍是从实际需求出发,Topic路由信息无需在集群之间保持强一致性,最终一致便可,从而减小对zk的依赖和性能的损失并发
消息存储方面,rocket引入文件组,无限循环使用,commitlog文件每一个1G,以第一个偏移值为文件名,为了和consumequeue一致,log中还包含了tag,key等信息便于恢复,顺序写,引入内存映射,相同主题的消息被顺序存储在同一文件中,还提供定时清理等防止过分堆积,利用消费队列文件和索引文件及pagecache等提高读性能,ConsumeQueue是消息的逻辑队列,相似数据库的索引文件,存储的是指向物理存储的地址。每一个Topic下的每一个Message Queue都有一个对应的ConsumeQueue文件, 里面有一部分是存储了tag对应的hashcode,通过对比,符合要求的消息被从commitlog中读取出来,消息在消费前,会对比完整的Message Tag字符串,清除hash冲突形成的误读负载均衡
消息过滤,基于tag等,在存储设计上基于hash等方式提高过滤效率,能够从Broker或者消费端过滤,broker端过滤能够减小传递到消费端的消息,减小网络损失,消费端过滤能够由消费者任意定义性能
定时消息,若是要支持任意精度的定时消息消费,必须在消息服务端对消息进行排序,势必带来很大的性能损耗,rocketmq设计不支持任意进度的定时消息,只支持特定延迟级别this
客户端支持Push(被推送)、pull(自主控制messagequeue的遍历及消息的读取)两种模式spa
线程池设计,rocketmq会根据不一样的任务类型建立不一样的线程池,若是该类型没注册,则由other之类的线程池统一处理线程
Namesrv之间数据能够不一致,彼此之间互不通讯设计
消息发送端提供容错机制,这个地方以前我就有疑问,为何在客户端或者消费端获取消息存储meta信息以后,namesrv发现变化后不会通知他们。。。原来是由meta使用端的容错机制来保证高可用,下降namesrv的复杂性
消息的顺序性保证,若是要全局一致,必须单一topic,单一辈子产者及消费者,清除一切并发,可行性比较低,性能和吞吐量没法接受,结合业务,通常是部分顺序消息,发送端将同一业务ID的消息发送到同一个Message Queue,在消费过程当中,不并发处理
CommitLog同步,不是通过netty命令的方式,而是直接TCP链接,效率更高,链接成功后,经过对比master和slave的offset,不断进行同步
从broker得到的消息,由于是提交到线程池里并行执行,很难监控和控制执行状态,RocketMQ定义了一个快照类ProcessQueue来解决
负载均衡或消息分配是在消费者端代码中完成,Consumer从broker处获取全局消息,而后本身作负载均衡,只处理分给本身的部分
跟kafka同样,总的消费者数量不要超过topic的队列数,不然多余的消费者收不到消息
Namesrv自己无状态,其中的Broker,topic等状态信息不会持久存储,都是由各个角色按期上报并存储到内存中
事物消息的实现:发送方向RocketMQ发送“待确认”消息,RocketMQ将收到的“待确认”消息持久化后,向发送方回复消息已经发送成功,发送方开始执行本地事件逻辑,发送方根据本地事件逻辑想RocketMQ发送二次确认,RocketMQ收到commit状态则将第一阶段消息标记为可投递,订阅方将能收到该消息,收到rollback状态则删除第一阶段的消息,若是出现异常,服务器在一段时间后未收到确认消息,则服务器将对“待确认”消息发起回查请求,发送方收到回查请求后经过检查对应消息的本地事件执行结果返回对应的状态,RocketMQ收到后继续处理
服务端接受到新请求后,若是队列没有新消息,并不急于返回,经过一个循环不断查看状态,长轮询的核心是,broker端hold住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接当即返回给消息的consumer,长轮询主动权仍是掌握在消费端手中,即便broker消息大量积压,也不会主动推送给消费者
在同步刷盘过程种,有一个设计,避免了任务提交与任务执行的锁冲突,因为避免同步刷盘消费任务与其余消费生产者提交任务直接的锁竞争,GroupCommitService提供读容器与写容器,这两个容器每执行完一次任务后,交互,继续消费任务。
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); public synchronized void putRequest(final GroupCommitRequest request) { synchronized (this.requestsWrite) { this.requestsWrite.add(request); } if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } } private void swapRequests() { List<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; }