假设当前目录为 /home/zk.配置3节点伪集群.node
1.下载shell
wget http://mirror.bit.edu.cn/apache/zookeeper/stable/apache-zookeeper-3.5.5-bin.tar.gz tar zxf apache-zookeeper-3.5.5-bin.tar.gz mv apache-zookeeper-3.5.5-bin zookeeper
2.建立zookeeper data目录apache
mkdir /home/zk/tmp/node-0/data -p mkdir /home/zk/tmp/node-0/datalog -p mkdir /home/zk/tmp/node-1/data -p mkdir /home/zk/tmp/node-1/datalog -p mkdir /home/zk/tmp/node-2/data -p mkdir /home/zk/tmp/node-2/datalog -p
3.修改配置文件bootstrap
cp zookeeper/conf/zoo_sample.cfg zookeeper/conf/zoo-node-0.cfg # 修改节点0配置 vi zookeeper/conf/zoo-node-0.cfg dataDir=/home/zk/tmp/node-0/data dataLogDir=/home/zk/tmp/node-0/datalog # server.index = ip:port-原子广播:port-选举 server.0=127.0.0.1:2887:3887 server.1=127.0.0.1:2888:3888 server.2=127.0.0.1:2889:3889 # 拷贝节点0配置 cp zookeeper/conf/zoo-node-0.cfg zookeeper/conf/zoo-node-1.cfg cp zookeeper/conf/zoo-node-0.cfg zookeeper/conf/zoo-node-2.cfg # 修改节点1配置 vi zookeeper/conf/zoo-node-1.cfg dataDir=/home/zk/tmp/node-1/data dataLogDir=/home/zk/tmp/node-1/datalog clientPort=2182 # 修改节点2配置 vi zookeeper/conf/zoo-node-2.cfg dataDir=/home/zk/tmp/node-2/data dataLogDir=/home/zk/tmp/node-2/datalog clientPort=2183 # 指定节点编号 echo 0 >> tmp/node-0/data/myid echo 1 >> tmp/node-1/data/myid echo 2 >> tmp/node-2/data/myid
4.启动服务器
cd zookeeper bin/zkServer.sh start conf/zoo-node-0.cfg bin/zkServer.sh start conf/zoo-node-1.cfg bin/zkServer.sh start conf/zoo-node-2.cfg # 查看状态 bin/zkServer.sh status conf/zoo-node-0.cfg bin/zkServer.sh status conf/zoo-node-1.cfg bin/zkServer.sh status conf/zoo-node-2.cfg
假设当前目录为 /home/kfkapp
1.下载socket
wget http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz tar zxf kafka_2.12-2.3.0.tgz # 或者设置软连接 mv kafka_2.12-2.3.0 kafka
2.建立日志目录ide
cd kafka mkdir tmp/kafka-logs-0 -p mkdir tmp/kafka-logs-1 -p mkdir tmp/kafka-logs-2 -p
3.修改配置文件ui
cp config/server.properties config/server-0.properties vi config/server-0.properties broker.id=0 listeners=PLAINTEXT://:9092 log.dirs=/home/kfk/kafka/tmp/kafka-logs-0 zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 # end vi cp config/server-0.properties config/server-1.properties cp config/server-0.properties config/server-2.properties vi config/server-1.properties broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/home/kfk/kafka/tmp/kafka-logs-1 # end vi vi config/server-2.properties broker.id=2 listeners=PLAINTEXT://:9094 log.dirs=/home/kfk/kafka/tmp/kafka-logs-2 # end vi
4.启动this
nohup bin/kafka-server-start.sh config/server-0.properties 1>/dev/null 2>&1 & nohup bin/kafka-server-start.sh config/server-1.properties 1>/dev/null 2>&1 & nohup bin/kafka-server-start.sh config/server-2.properties 1>/dev/null 2>&1 &
broker 配置
https://kafka.apache.org/documentation/#configuration
https://blog.csdn.net/memoordit/article/details/78850086
必要的配置
########################## Server Basics ########################### ## 服务节点id,整数 >=-1,集群内惟一 broker.id=0 #################### Socket Server Settings ######################### ## socket 服务器监听地址 listeners=PLAINTEXT://:9092 ########################## Log Basics ################################ ## 存储日志文件的目录列表,逗号分隔 log.dirs=/home/yukl/2_opensys/kafka/tmp/kafka-logs-0 ## 每一个topic的默认分区数 num.partitions=1 ##################### Internal Topic Settings ####################### ## offset topic 的复制因子 offsets.topic.replication.factor=1 ## 事务 topic 的复制因子 transaction.state.log.replication.factor=1 ## 事务 topic 的min.insync.replicas 配置 transaction.state.log.min.isr=1 ####################### Log Flush Policy ############################ ## 消息数达到目标刷新到磁盘 #log.flush.interval.messages=10000 ## 时间间隔达到目标刷新到磁盘 #log.flush.interval.ms=1000 ####################### Log Retention Policy ######################### ## 日志文件被删除前的保存时间(单位小时) log.retention.hours=168 ## 优先使用minutes配置,不然使用ms,若是这两项都没设置,则使用hours #log.retention.minutes #log.retention.ms ## 日志达到删除大小的阈值。每一个topic下每一个分区保存数据的最大文件大小;注意,这是每一个分区的上限,所以这个数值乘以分区的个数就是每一个topic保存的数据总量。同时注意:若是log.retention.hours和log.retention.bytes都设置了,则超过了任何一个限制都会形成删除一个段文件。注意,这项设置能够由每一个topic设置时进行覆盖。-1为不限制。 #log.retention.bytes=1073741824 ## topic 分区的日志存放在某个目录下诸多文件中,这些文件将partition的日志切分红一段一段的,这就是段文件(segment file);一个topic的一个分区对应的全部segment文件称为log。这个设置控制着一个segment文件的最大的大小,若是超过了此大小,就会生成一个新的segment文件。此配置能够被覆盖。 int log.segment.bytes=1073741824 ## 检查日志段文件的间隔时间,以肯定是否文件属性是否到达删除要求。 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# ## zookeeper 主机地址 zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 ## 与zookeeper创建链接的超时时间(ms) zookeeper.connection.timeout.ms=6000 ################### Group Coordinator Settings######################## # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 ## 消息中的时间戳取值为建立时间或者日志追加时间 ## 取值 CreateTime 或者 LogAppendTime ## 默认 CreateTime log.message.timestamp.type=CreateTime
producer 配置
https://kafka.apache.org/docu...
name | desc | type | default | valid values |
---|---|---|---|---|
acks | 生产者的请求反馈方式:0-不等待,仅仅将消息发送至缓冲区即返回,此模式下retries配置无效;1-等待leader写成功即返回;all或者-1 表示等待全部的follower写成功才返回。 | string | 1 | [all, -1, 0, 1] |
bootstrap.servers | 初始链接服务列表(host:port,host:port),是kafka集群的一个子集,用于发现全部的kafak集群 | list | "" | non-null string |
compression.type | 消息压缩模式,支持none, gzip, snappy, lz4, or zstd. | string | none | |
retries | 设置一个比零大的值,客户端若是发送失败则会从新发送。注意,这个重试功能和客户端在接到错误以后从新发送没什么不一样。若是max.in.flight.requests.per.connection没有设置为1,有可能改变消息发送的顺序,由于若是2个批次发送到一个分区中,并第一个失败了并重试,可是第二个成功了,那么第二个批次将超过第一个 | int | 2147483647 | [0,...,2147483647] |
client.id | 一个字符串,在请求时传递给服务器。用于追踪请求源 | string | "" | |
connections.max.idle.ms | 链接闲置时间 | long | 540000 | |
delivery.timeout.ms | 发送消息上报成功或失败的最大时间.注意,若是此时间在retries耗尽以前达到,则断定请求失败 | int | "" |
consumer 配置
https://kafka.apache.org/docu...
name | desc | type | default | valid values |
---|---|---|---|---|
bootstrap.servers | 同producer | |||
group.id | consumer group惟一标识 | string | ||
auto.offset.reset | earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 <br/>latest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 <br/>none :topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 | string | latest | [earliest,<br/>latest,<br/>none] |
connections.max.idle.ms | 同上 | |||
client.id | 同上 | |||
enable.auto.commit | 若是为true,消费者的offset后台会按期自动提交 | boolean | true | |
auto.commit.interval.ms | 消费者自动提交offset的周期(ms) | int | 5000 |