Pulsar Basics

Pulsar Basics

Pulsar architecture

Pulsar Basics-Pulsar architecture

架构上,一个pulsar cluster包含三个要素:ZooKeeper 集群、BookKeeper 集群和 broker 集群。其中BookKeeper做消息的存储,broker做消息的处理计算,pulsar依靠BookKeeper,实现了「存储计算分离」的架构,这是有别于其他MQ最大的一点。还有一个ZooKeeper集群主要是负责存储broker和BookKeeper的元数据,以及pulsar cluster的集群配置协调工作。

Subscription

Pulsar Basics-Subscription

  • Topic,Pub-Sub模式下的消息统一汇集地,不同生产者向topic发送消息,由broker分发到不同的订阅者
  • 单个topic的消息一般是由单个broker处理,这限制了topic的最大吞吐量,为了提高topic的消息处理能力,pulsar提供了partitioned topic的支持,这样使得某个topic可以被多个broker处理
  • 具体地,某一个topic有多个partition(可以理解为多个内部的topic),每个partition由不同的broker处理
  • 上图中的consumer是Exclusive独占型订阅模式,同一时刻只能有一个Consumer消费topic的消息,超过一个Consumer会收到错误

Subscription Modes

Pulsar Basics-Subscription Modes

  • 除了独占型(exclusive),Pulsar还有另外三种订阅模式,分别是共享(shared),故障转移/灾备(failover)和 键共享(key_shared)。
  • 故障转移/灾备(failover)模式:同一时刻只有一个有效的Consumer,其余的Consumer作为备用节点,在Master Consumer不可用后进行替代
  • 共享(shared)模式:
    • 可以同时存在多个Consumer,每个Consumer处理Topic中一部消息
    • 通过一定的机制分发给不同的消费者,比如轮询机制,每条消息仅会被分发给一个消费者
  • 键共享(key_shared)模式:类似于shared模式,唯一区别在于相同键(key)的消息会传递给同一个消费者,这个key需要在producer client 为每一条消息手动指定。这样一来,在不同的consumer上就能够保证消息的顺序。相当于同时兼顾了shared模式的比较高的消费能力和exclusive模式的有序性。

Topic(partition)

Pulsar Basics-Topic(partition)

  • 为了更好的理解pulsar的架构,需要先了解pulsar的存储逻辑模型,需要注意的是上图中的全是逻辑概念,不是物理概念,也就是pulsar不是按照一个个真实的partition或者ledger文件存储的,这个后面会有解释。
  • 一个partitioned topic有多个partition;每个partition由多个ledger组成;每个ledger由一个或多个fragment组成;每个fragment由多个entry组成;每个entry包含了一条或多条message,这取决于你是否采用批处理模式
  • Ledger作为最小的删除单元,也就是说我们不能删除单个entry而是去删除整个Ledger。Ledger在以下这些情况会发生滚动并创建新的Ledger:
    • 已达到Ledger的大小或时间限制。
    • Ledger的所有权(Pulsar Broker的所有权)发生变化。
  • Fragment是BookKeeper集群中最小的分布单元,什么时候会创建新的fragment呢?
    • 一是在新的ledger创建时,
    • 二是当前fragment使用的Bookies发生写入错误或超时
  • 每条message包含一系列信息:
    • Data: 消息数据,一般是bytes格式
    • key: 可选项,用于key_shared模式
    • Publish Time: producer生成的时间戳
    • Event Time: 可以由producer client手动指定的时间
    • Properties: 由producer client手动指定一条消息的属性,格式是key/value格式
    • MessageId: 全局唯一的ID,表征这条消息在这个topic中的位置
    • SchemaVersion: 这条消息的schema version
  • 重点介绍MessageId
    • 格式为ledger-id, entry-id, batch-index, partition-index,分别对应这条消息属于哪个ledger,哪个entry,batch中的位置,以及哪个partition
    • 如果这条消息是非batch, batch-index就是-1,如果这个topic是non-partitioned topic,则partition-index为-1

Cursor-Subscription State

Pulsar Basics-Cursor_ Subscription State

  • 一个Topic可以添加多个订阅。Pulsar使用游标(cursor)来跟踪订阅(subscription)的消费状态,图中为了简单明了的说明,省略了batch-index, partition-index,只用(ledger-id, entry-id)来表示订阅的消费位置。
  • cursor可以类比kafka的offset

Reader_ Non-durable Curcor

Pulsar Basics-Reader_ Non-durable Curcor

  • Pulsar还提供一个Reader接口,可以手动指定从某一个消息的MessageDd开始读取消息,你可以把它看作一个非持久化cursor的消费方式,cursor只在内存中保存,一旦连接中断,cursor就不存在了,
  • pulsar client Reader提供三种起始位置读取:
    • MessageId.earliest
    • MessageId.latest
    • MessageId

Tenant & Namespace

Pulsar Basics-Tenant & Namespace

  • Pulsar天然支持多租户,每个topic的全称分为四个层级:
    {persistent|non-persistent}://tenant/namespace/topic
  • 其中persistent代表持久消息存储,tenant代表租户;Namespace代表命名空间。可以把这样的层级看作一个公司由不同的部门(tenant),每个部门有不同的组(Namespace),每个组有不同的业务(topic),例如下一张图。
  • Namespace(命名空间)将相关联的 topic 作为一个组来管理,是管理 Topic 的基本单元。 大多数对 topic 的管理都是对Namespace的一项配置,比如消息保留策略,TTL等等。 每个tenant可以有多个Namespace。但现在也在慢慢开发在topic级别进行管理配置的能力。
  • Pulsar提供多种形式的topic格式,可以进行相应的简写,例如如果创建topic只定义了topicname:mytopic,则会自动创建persistent://public/default/mytopic

Pulsar Basics-Multi-tenant

Write Path

Pulsar Basics-Write Path

Read Path

Pulsar Basics-Read Path

BookKeeper 一致性

BookKeeper Ledger 的创建

前面提到,每个topic/partition由多个ledger组成,每个Ledger有三个关键配置:

  • Ensemble Size (E)
  • Write Quorum Size (Qw)
  • Ack Quorum Size (Qa)
  • 其中E指的是持久化这个ledger需要的一组bookie的数量,需要注意的是E不等同于Bookkeeper集群中所有bookie的数量,一般来说,E是小于等于这个总数的
  • Qw指的是该ledger需要写入的实际的bookie数量,可以等于或者小于E
  • Qa指的是确认写入Bookies的数量,为了一致性,Qa应该是:(Qw + 1) / 2 或者更大,这是为了避免单节点响应缓慢,只要保证大多数bookie写入成功,我们就视为持久化成功,此时broker就可以发送ack给producer了
  • 上图中就是E=5,Qw=3,Qa=2的情况
  • 若E中的某个bookie挂掉了,会从整个bookie集群中找一个新的bookie替换掉它,此时就发生了Ensemble Change
  • Pulsar还有一个Speculative Reads的机制,指的是发起读请求时,pulsar会先发送读请求到某一个bookie上,若延迟超过一定时间,则会发送读请求到另一个bookie,然后同时等待这两个bookie的返回结果,谁先返回就接受谁的读取数据。这样就可以将读延迟控制在有限的范围内。这是因为bookie节点都是对等的,而不是kafka那种主从机制,只有主节点负责读写,从节点只能复制数据。从而提高读写的效率

BookKeeper Ledger 多副本复制

BookKeeper Ledger 多副本复制(332)

E=3,Qw=3的情况下,每个bookie都保存有整个ledger的所有数据,这就类似于kafka,每个Partition副本都完整的存储在kafka节点上。

BookKeeper Ledger 多副本复制(532)

  • E=5,Qw=3的情况下,每个Entry会以轮询的方式滚动保存在不同的bookie(Qw)上,造成的结果就是ledger数据条带化的分布在不同的bookie上,每个bookie只保存了这个ledger的一部分数据。
  • 当存储出现瓶颈,需要做集群扩展时,只需添加更多Bookies,pulsar会在新的bookie创建新的ledger/Fragment,不用进行类似于kafka的Rebalance操作,这个过程中没有数据的复制搬移

BookKeeper 读写分离

BookKeeper 读写分离

  • 前面提到的Ledgers和Fragments是在Zookeeper中维护和跟踪的逻辑结构,物理上数据不存储在Ledgers和Fragments对应的文件中。
  • 下面讲一下BookKeeper的读写分离机制,图中蓝色线的写操作,红色线是读操作。
  • 先看写操作,首先,BookKeeper会先把数据同步刷盘写入到journal日志,保存在journal盘,文件是以时间顺序写入的,不保证单个ledger的有序。同时会写入wirite cache,然后通过聚合排序,使得同一个ledger的entry保证局部有序,定时异步刷盘写入到entry log,每条entry的位置索引保存在RocksDB,只是将(LedgerId,EntryId)映射到(EntryLogId,文件中的偏移量)。
  • 再看读操作,追尾读(tailing reads)时,读的都是最新文件,可以在Write cache命中,不用访问bookie;追赶读时,读的历史文件,读取时尝试读取写缓存,如果没有命中尝试读取读缓存。如果两者都没有命中,需要从RocksDB中查找entry的位置,然后在lentry log文件中读取该entry并且会更新到读缓存中以便后续请求命中缓存。这两层缓存意味着读取通常可以在内存中完成。
  • BookKeeper容许将磁盘IO做读写分离。写入都按时间顺序写入Journal日志文件可以存储在专用的磁盘上(推荐SSD),获得更高的同步刷盘性能。除此之外从写入操作来看没有其他的同步磁盘IO操作,数据都是写入到内存的缓存区。写缓存通过异步刷盘的方式批量将Entry写入到entry log和RocksDB,因此,一个磁盘用于同步写入Journal日志文件,另一个磁盘用于异步写入Entry log和读取操作。

Failure Handling

Pulsar Basics-Failure Handling

Review

概念2