【Apache Pulsar】Apache Pulsar单机环境及Go语言开发环境搭建

 

0x01 简介

Apache Pulsar是一个开源的分布式发布-订阅消息系统,与Kafka相似,但比后者更增强大。Pulsar最初由Yahoo开发并维护,目前已经成为Apache软件组织的一个孵化子项目,当前最新版本号为2.1.0-incubating。官网地址:http://pulsar.apache.org/git

0x02 Apache Pulsar单机版环境搭建

一、前提条件

Pulsar目前仅仅支持MacOS和Linux系统,不支持Windows系统。而且要求系统中安装了Java 8环境。github

二、系统环境

咱们以CentOS系统做为搭建环境,系统为CentOS7.2。web

三、搭建步骤

首先,访问官网下载网页http://pulsar.apache.org/en/download/,以下图所示:apache

因为我本地环境的限制,因此本文中全部的文件下载都是首先在Windows系统中下载,而后手动拷贝到Linux服务器上的。vim

此处,咱们点击下载第一个,即二进制发布。而后拷贝到CentOS服务器上,并解压该压缩包,结果以下:服务器

进入对应解压获得的文件夹,该文件夹下文件以下:分布式

进入conf文件夹下,并使用vi或vim打开文件client.conf,修改里面的webServiceUrl和brokerServiceUrl字段中对应的IP为该服务器IP,以下所示(其中涂改部分为服务器IP):spa

保存并退出,而后进入到bin目录下,之后台运行模式启动pulsar服务,以下:.net

因为以前我已经启动了后台服务,因此上图中提示已经在运行该服务。
如此简单,pulsar单机版就这么顺利的运行起来了。然而,如何验证是否正常启动了呢?一种是经过查看日志文件来确保正常启动,此处略去这种方式。直接使用指令来验证是否正常启动:
(1)建立消费者consumer
指令:3d

$ ./pulsar-client consume -s "my-subscription" my-topic

含义:建立一个consumer,该consumer订阅的topic名称为my-topic,本订阅名称为my-subscription。建立成功会打印以下信息(只截图了部分信息):

建立成功后,该consumer就处于等待接收消息状态。

(2)建立生产者producer

$ ./pulsar-client produce my-topic --messages "test message from producer"

含义:建立一个producer,该producer对应的topic名称为my-topic(与上面建立的consumer订阅的topic相同),发送的消息由--messages指定,此处内容为“test message from producer”。建立成功会打印以下信息(只截图了部分信息):

此时,咱们会在1中建立的consumer端接收到producer发送的消息,以下图:

至此,说明咱们的pulsar服务正常运行。

0x03 Pulsar Go语言开发环境搭建

前提条件:开发电脑本地或Linux服务器中已经安装好了Go开发环境。

在Windows系统中开发Pulsar时须要安装GCC编译环境,因此须要安装MinGW,因为环境限制,这里我没法下载MinGW,因此就直接在CentOS系统中搭建开发环境了。

当前版本(2.1.0-incubating)下,Pulsar官方仅仅提供了C++、Java、Python、Go四种语言的客户端开发包。且四种语言的支持特性不尽相同,以下所示:

此外,还有一些第三方的客户端包,以下:

因为Pulsar Go客户端库是基于C++客户端库的,因此在安装Go库以前必需要确保已经成功安装了C++客户端库。

一、安装Pulsar C++客户端

在Pulsar C++客户端网页中,下载下图中所示的三个文件:

而后将下载的三个文件拷贝到CentOS服务器上,以下:

而后执行以下命令来安装这三个RPM包:

$ rpm -ivh apache-pulsar-client*.rpm

此处暂且先不验证是否安装成功。

二、安装Pulsar Go客户端

因为我环境所限制,没法使用go get的方式来下载Pulsar的Go语言包,我是直接在GitHub上面下载的incubator-pulsar-branch-2.1.zip,解压该文件获得以下内容:

此处,咱们仅仅须要里面的pulsar-client-go文件夹里面的内容,根据官网上的示例程序可知该go语言包的路径以下:

因此咱们将pulsar-client-go拷贝到CentOS服务器上$GOPATH/src/github.com/apache/incubator-pulsar下,若是中间文件夹不存在就本身建立,最终以下:

到此,Pulsar Go客户端包安装完成。

此时,咱们使用一个简单的Pulsar Go程序来验证上面安装是否正常,程序内容以下:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "runtime"
 6     "context"
 7     "github.com/apache/incubator-pulsar/pulsar-client-go/pulsar"
 8     "log"
 9 )
10 
11 func main (){
12     fmt.Println("Pulsar Producer")
13 
14     ctx := context.Background()
15 
16     //实例化Pulsar client
17     client,err := pulsar.NewClient(pulsar.ClientOptions{
18         URL:"pulsar://xx.xx.xx.xx:6650",  //xx.xx.xx.xx表明Pulsar IP
19         OperationTimeoutSeconds:5,
20         MessageListenerThreads:runtime.NumCPU()/2,
21     })
22 
23     if err !=  nil {
24         log.Fatalf("Could not instantiate Pulsar client:%v",err)
25     }
26 
27 
28     // 建立producer
29     producer,err := client.CreateProducer(pulsar.ProducerOptions{
30         Topic:"my-topic",
31     })
32 
33     if err != nil {
34         log.Fatalf("Could not instantiate Pulsar producer:%v",err)
35     }
36 
37     defer producer.Close()
38 
39     msg := pulsar.ProducerMessage{
40         Payload:[]byte("Hello,This is a message from Pulsar Producer!"),
41     }
42 
43     if err := producer.Send(ctx,msg);err != nil {
44         log.Fatalf("Producer could not send message:%v",err)
45     }
46 
47 }

编译并运行,结果以下:

但其实咱们查看/usr/lib路径下发现,实际上是存在libpulsar.so.2.1.0-incubating这个库文件的,因此应该是系统没有引用到这个路径:

因此,须要将该文件所在的路径添加到到/etc/ld.so.conf中,以下:

此时,再次运行程序时则成功:

0x04 Pulsar producer和consumer示例程序

此处直接给出代码,里面有必要的注释:

一、producer

文件:pulsar-producer.go

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "runtime"
 6     "context"
 7     "github.com/apache/incubator-pulsar/pulsar-client-go/pulsar"
 8     "log"
 9 )
10 
11 func main (){
12     fmt.Println("Pulsar Producer")
13 
14     ctx := context.Background()
15 
16     //实例化Pulsar client
17     client,err := pulsar.NewClient(pulsar.ClientOptions{
18         URL:"pulsar://xx.xx.xx.xx:6650",  // xx.xx.xx.xx表明Pulsar IP
19         OperationTimeoutSeconds:5,
20         MessageListenerThreads:runtime.NumCPU()/2,
21     })
22 
23     if err !=  nil {
24         log.Fatalf("Could not instantiate Pulsar client:%v",err)
25     }
26 
27 
28     // 建立producer
29     producer,err := client.CreateProducer(pulsar.ProducerOptions{
30         Topic:"my-topic",
31     })
32 
33     if err != nil {
34         log.Fatalf("Could not instantiate Pulsar producer:%v",err)
35     }
36 
37     defer producer.Close()
38 
39     msg := pulsar.ProducerMessage{
40         Payload:[]byte("Hello,This is a message from Pulsar Producer!"),
41     }
42 
43     if err := producer.Send(ctx,msg);err != nil {
44         log.Fatalf("Producer could not send message:%v",err)
45     }
46 
47 }

二、consumer

文件:pulsar-consumer.go

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "github.com/apache/incubator-pulsar/pulsar-client-go/pulsar"
 6     "log"
 7     "context"
 8 )
 9 
10 func main()  {
11     fmt.Println("Pulsar Consumer")
12 
13     //实例化Pulsar client
14     client,err := pulsar.NewClient(pulsar.ClientOptions{
15         URL:"pulsar://xx.xx.xx.xx:6650", // xx.xx.xx.xx表明Pulsar IP
16     })
17 
18     if err != nil {
19         log.Fatal(err)
20     }
21 
22     //使用client对象实例化consumer
23     consumer,err := client.Subscribe(pulsar.ConsumerOptions{
24         Topic:"my-topic",
25         SubscriptionName:"sub-demo",
26     })
27 
28     if err != nil {
29         log.Fatal(err)
30     }
31 
32     defer consumer.Close()
33 
34     ctx := context.Background()
35 
36     //无限循环监听topic
37     for {
38         msg,err := consumer.Receive(ctx)
39         if err != nil {
40             log.Fatal(err)
41         } else {
42             fmt.Printf("Received message : %v",string(msg.Payload()))
43         }
44 
45         consumer.Ack(msg)
46         
47     }
48 
49 }

这两个go文件分别处于两个项目中,其项目结构分别以下:

而后,分别编译这两个go项目,并生成可执行文件。首先运行pulsar-consumer,打开消费者程序,此时该消费者程序处于监听消息状态,以下:

而后,运行pulsar-producer,打开生产者程序,以下:

该生产者程序发送完一条消息以后即运行结束并退出。
此时,再回到消费者程序运行界面,就能够看到消费者这边已经接收到了生产者发送的那条消息:

到此,Go语言版本的最简单的producer和consumer代码就完成了。

0x05 参考连接

相关文章
相关标签/搜索