confluent-kafka集群搭建

1.   什么是confluent-kafka?

Confluent Platform 是一个流数据平台,可以组织管理来自不一样数据源的数据,拥有稳定高效的系统。
Confluent Platform 不只提供数据传输的系统, 还提供全部的工具:链接数据源的工具,应用, 以及数据接收。 
Confluent是基于Kafka构造的软件,他有一个企业版 试用30天,一个是开源版复制代码

这里咱们使用开源版的 confluent-kafka, 开源版包括如下组件:html


完整工具以避免费试用组件请见官网:  www.confluent.io/downloadjava

组件功能介绍:  www.cnblogs.com/dadadecheng…
linux

2.  开始搭建 confluent-kafka 

2.1 安装最新版本 confluent-kafka 

yum install curl which -y
rpm --import https://packages.confluent.io/rpm/5.3/archive.key复制代码

配置yum源git

cat > /etc/yum.repos.d/confluent.repo <<EOF
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.3/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.3/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.3
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.3/archive.key
enabled=1
EOF复制代码

Confluent Platform using only Confluent Community components:
github

yum clean all &&  yum install confluent-community-2.12 -y
复制代码

2.2  安装jdk1.8

去oracle官网下载jdk, 上传到服务器,执行下面命令安装:golang

rpm -ivh jdk-8u231-linux-x64.rpm复制代码

oracle jdk1.8下载页面:  www.oracle.com/technetwork…bootstrap

2.3  配置zookeeper

vim /etc/kafka/zookeeper.propertiesvim

tickTime=2000
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24复制代码

注*  zoo1 zoo2 zoo3 为主机名,请提早配置好hostsbash

echo "1" > /var/lib/zookeeper/myid   #请在zoo1 机器上执行
echo "2" > /var/lib/zookeeper/myid   #请在zoo2 机器上执行
echo "3" > /var/lib/zookeeper/myid   #请在zoo3 机器上执行启复制代码

设置开机启动并启动zookeeper服务器

systemctl enable confluent-zookeeper && systemctl start confluent-zookeeper复制代码

查看状态

systemctl status confluent-zookeeper复制代码

查看日志

tail -f /var/log/messages复制代码

2.3  配置kafka

vim /etc/kafka/server.properties  #修改如下2个选项 zoo

zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181broker.id.generation.enable=true复制代码

设置开机启动并启动kafka

systemctl enable confluent-kafka && systemctl start confluent-kafka复制代码

查看状态

systemctl status confluent-kafka复制代码

查看日志

tail -f /var/log/messages复制代码


3. 测试kafka生产消息

3.1 建立topic 

kafka-topics --create --bootstrap-server zoo1:9092,zoo2:9092,zoo3:9092 --replication-factor 1 --partitions 5 --topic kafka-test复制代码

3.2 生产消息

kafka-console-producer --broker-list zoo1:9092,zoo2:9092,zoo3:9092 --topic kafkatest
复制代码

3.3 消费消息

kafka-console-consumer --bootstrap-server zoo1:9092,zoo2:9092,zoo3:9092 --topic kafkatest --from-beginning复制代码


4. 使用golang demo测试生产消息消耗的时间

package main

import (
	"fmt"	
	"log"
	"time"
        "github.com/Shopify/sarama"
)

func main()  {
    var address = []string{"ip1:9092","ip2:9092","ip3:9092"}
    producer(address)
}

func producer(address []string) {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Timeout = 5 * time.Second
	p, err := sarama.NewSyncProducer(address, config)
	if err != nil {
		log.Println(err)
	}
	defer p.Close()
	strKey := "key: "
	srcValue := "testKafka: test message, index=%d"
	log.Println("start")
	for i := 0; i < 10000; i++ {
		value := fmt.Sprintf(srcValue, i)
		msg := &sarama.ProducerMessage{
			Key:   sarama.StringEncoder(strKey),
			Topic: gotest,
			Value: sarama.ByteEncoder(value),
		}
		part, offset, err := p.SendMessage(msg)
		if err != nil {
			log.Println(err, value, part, offset)
		}
	}
	log.Println("end")
}
复制代码

4.1 10000和100000条所消耗的时间

1万条:


10万条:


速度仍是很快的!

相关文章
相关标签/搜索