Confluent Platform 是一个流数据平台,可以组织管理来自不一样数据源的数据,拥有稳定高效的系统。
Confluent Platform 不只提供数据传输的系统, 还提供全部的工具:链接数据源的工具,应用, 以及数据接收。
Confluent是基于Kafka构造的软件,他有一个企业版 试用30天,一个是开源版复制代码
这里咱们使用开源版的 confluent-kafka, 开源版包括如下组件:html
完整工具以避免费试用组件请见官网: www.confluent.io/downloadjava
组件功能介绍: www.cnblogs.com/dadadecheng…
linux
yum install curl which -y
rpm --import https://packages.confluent.io/rpm/5.3/archive.key复制代码
配置yum源git
cat > /etc/yum.repos.d/confluent.repo <<EOF
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.3/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.3/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.3
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.3/archive.key
enabled=1
EOF复制代码
Confluent Platform using only Confluent Community components:
github
yum clean all && yum install confluent-community-2.12 -y
复制代码
去oracle官网下载jdk, 上传到服务器,执行下面命令安装:golang
rpm -ivh jdk-8u231-linux-x64.rpm复制代码
oracle jdk1.8下载页面: www.oracle.com/technetwork…bootstrap
vim /etc/kafka/zookeeper.propertiesvim
tickTime=2000
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24复制代码
注* zoo1 zoo2 zoo3 为主机名,请提早配置好hostsbash
echo "1" > /var/lib/zookeeper/myid #请在zoo1 机器上执行
echo "2" > /var/lib/zookeeper/myid #请在zoo2 机器上执行
echo "3" > /var/lib/zookeeper/myid #请在zoo3 机器上执行启复制代码
设置开机启动并启动zookeeper服务器
systemctl enable confluent-zookeeper && systemctl start confluent-zookeeper复制代码
查看状态
systemctl status confluent-zookeeper复制代码
查看日志
tail -f /var/log/messages复制代码
vim /etc/kafka/server.properties #修改如下2个选项 zoo
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181broker.id.generation.enable=true复制代码
设置开机启动并启动kafka
systemctl enable confluent-kafka && systemctl start confluent-kafka复制代码
查看状态
systemctl status confluent-kafka复制代码
查看日志
tail -f /var/log/messages复制代码
kafka-topics --create --bootstrap-server zoo1:9092,zoo2:9092,zoo3:9092 --replication-factor 1 --partitions 5 --topic kafka-test复制代码
kafka-console-producer --broker-list zoo1:9092,zoo2:9092,zoo3:9092 --topic kafkatest
复制代码
kafka-console-consumer --bootstrap-server zoo1:9092,zoo2:9092,zoo3:9092 --topic kafkatest --from-beginning复制代码
package main
import (
"fmt"
"log"
"time"
"github.com/Shopify/sarama"
)
func main() {
var address = []string{"ip1:9092","ip2:9092","ip3:9092"}
producer(address)
}
func producer(address []string) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(address, config)
if err != nil {
log.Println(err)
}
defer p.Close()
strKey := "key: "
srcValue := "testKafka: test message, index=%d"
log.Println("start")
for i := 0; i < 10000; i++ {
value := fmt.Sprintf(srcValue, i)
msg := &sarama.ProducerMessage{
Key: sarama.StringEncoder(strKey),
Topic: gotest,
Value: sarama.ByteEncoder(value),
}
part, offset, err := p.SendMessage(msg)
if err != nil {
log.Println(err, value, part, offset)
}
}
log.Println("end")
}
复制代码
1万条:
10万条:
速度仍是很快的!