在 Pulsar 中,Function、Source 和 Sink 都是运行在 Function Worker 上的,关于 Function 的内容能够参考一篇文章了解 Pulsar Functions,关于 Source 和 Sink 的使用能够参考web
本文介绍了 Functions Worker 的选举机制。阅读本文能够对 Functions Worker 有更加深刻的理解。docker
如下是 Pulsar Functions Worker 的架构图apache
架构图中的各个组件 Metadata Mananger、Scheduler Manager、Runtime Manager 和Membership Manager 都是运行在 Worker 上的。bash
咱们来了解下 Worker 的选举。架构
2.启动时,每一个 Worker 内部都会启动一个 Consumer,该 Consumer 用来进行选举,每一个 Worker 的 id,主机名以及端口号会和该 Consumer 绑定。
3.该 Consumer 基于 Failover 模式启动(关于 Consumer 的 Failover 订阅模式能够参考[这里](http://pulsar.apache.org/docs...)。链接到同一个 Topic,具备相同订阅名称的 Consumer 中同一时刻只有一个处于活跃状态。
4.使用该 Consumer 的 Worker 就是 Leader,它负责进行调度并处理一些其余操做。eclipse
Worker Service1 和 Worker Service2 为一个集群,同时链接到 Topic1, 具备相同的订阅名称 sub,Leader 会在它们两个 Worker 中产生。
Worker Service3 为另外一个集群,链接到 Topic2,订阅名称为 sub3,由于只有一个 Worker,因此它自己就是 Leader。测试
下面咱们来实践选举过程。spa
在搭建本次试验前,须要在电脑上安装如下依赖,本次试验是在 Mac 系统上进行的测试。3d
拉取 apachepulsar/pulsar:2.3.1 的镜像,而后使用 docker run
命令进行启动,-d
参数使该服务运行在后台模式,-it
以交互模式运行容器,并为容器分配一个伪输入终端。--name
指定该容器使用 pulsar-standalone-function-leader
这个名字。日志
docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone-function-leader apachepulsar/pulsar:2.3.1 bin/pulsar standalone
docker logs -f 11:17:17.363 [pulsar-web-55-4] INFO org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public] 11:17:17.369 [pulsar-web-55-4] INFO org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Created namespace public/default 11:17:17.370 [pulsar-web-55-4] INFO org.eclipse.jetty.server.RequestLog - 172.17.0.2 - - [18/May/2019:11:17:17 +0000] "PUT /admin/v2/namespaces/public/default HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.3.1" 10 11:17:17.377 [pulsar-web-55-12] INFO org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
启动 Pulsar 后出现上面的日志说明服务启动成功。
Worker 的选举是基于 Consumer 的 Failover 模式,所以在本次测试中,直接启动 Consumer 来模拟选举。
如上选举图所示,本次测试会启动三个 Failover 模式的 Consumer,其中两个 Consumer 链接到 Topic1,使用的订阅名称为 sub,它们表明一个 Worker 集群;另外一个 Consumer 链接到 Topic2,使用订阅名称 sub3 表明另外一个 Worker 集群。
为了本次测试,须要开启四个窗口,分别命名为 window1,window2,window3,window4。
window4 用来进行验证。
window1
以 Failover 模式启动一个消费者,使用订阅名称 sub,订阅到 public/default/topic1
docker exec -it pulsar-standalone-function-leader /bin/bash ./bin/pulsar-client consume persistent://public/default/topic1 --num-messages 0 --subscription-name sub -t Failover
window2
以 Failover 模式启动一个消费者,使用订阅名称 sub,订阅到 public/default/topic1
docker exec -it pulsar-standalone-function-leader /bin/bash ./bin/pulsar-client consume persistent://public/default/topic1 --num-messages 0 --subscription-name sub -t Failover
window3
以 Failover 模式启动一个消费者,使用订阅名称 sub3,订阅到 public/default/topic2
docker exec -it pulsar-standalone-function-leader /bin/bash ./bin/pulsar-client consume persistent://public/default/topic2 --num-messages 0 --subscription-name sub3 -t Failover
三个窗口的操做都使用了相同的订阅模式 Failover,window1 和 window2 使用相同的订阅名称订阅到同一个 Topic。
window4
经过命令 topics stats 来获取 topic1 的统计信息
./bin/pulsar-admin topics stats topic1 { "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "averageMsgSize" : 0.0, "storageSize" : 0, "publishers" : [ ], "subscriptions" : { "sub" : { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "msgBacklog" : 0, "blockedSubscriptionOnUnackedMsgs" : false, "unackedMessages" : 0, "type" : "Failover", "activeConsumerName" : "383dc", "msgRateExpired" : 0.0, "consumers" : [ { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "383dc", "availablePermits" : 1000, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "address" : "/127.0.0.1:50014", "connectedSince" : "2019-05-18T11:30:34.161Z", "clientVersion" : "2.3.1" }, { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "51911", "availablePermits" : 1000, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "address" : "/127.0.0.1:50018", "connectedSince" : "2019-05-18T11:30:42.742Z", "clientVersion" : "2.3.1" } ] } }, "replication" : { }, "deduplicationStatus" : "Disabled" }
在 Topic1 下出现了一个订阅,两个 Consumer,当前的 activeConsumerName 是 383dc
,正是 window1 中的订阅。这说明当前持有该订阅的 Worker 为 Leader。
当把 window1 的订阅关掉后,再看一下。在 window1 中,使用 CTRL + C 关掉该 Consumer,再回到 window4 查看订阅。
./bin/pulsar-admin topics stats topic1 { "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "averageMsgSize" : 0.0, "storageSize" : 0, "publishers" : [ ], "subscriptions" : { "sub" : { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "msgBacklog" : 0, "blockedSubscriptionOnUnackedMsgs" : false, "unackedMessages" : 0, "type" : "Failover", "activeConsumerName" : "51911", "msgRateExpired" : 0.0, "consumers" : [ { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "51911", "availablePermits" : 1000, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "connectedSince" : "2019-05-18T11:30:42.742Z", "clientVersion" : "2.3.1", "address" : "/127.0.0.1:50018" } ] } }, "replication" : { }, "deduplicationStatus" : "Disabled" }
能够看到 window2 中启动的订阅被成功激活,当前的 activeConsumerName 是 51911,这时持有该 Consumer 的 Worker 便成为了 Leader。
以上是在 Connector 运行 Worker 的选举机制,能够看到其很是巧妙的运用了 Consumer 的 Failover 模式,来实现 Worker 的高可用机制。