一般,生产环境应该将Kafka集群部署在Linux操做系统上,缘由以下:
(1)Kafka客户端底层使用了Java的selector,selector在Linux上的实现机制是epoll,而在Windows平台上的实现机制是select,所以Kafka部署在Linux上可以得到更高效的I/O性能。
(2)网络传输效率的差异。Kafka须要在磁盘和网络间进行大量数据传输,在Linux部署Kafka可以享受到零拷贝(Zero Copy)技术所带来的快速数据传输特性。
(3)社区的支持度。Apache Kafka社区目前对Windows平台上发现的Kafka Bug不作任何承诺。java
(1)Kafka实现了冗余机制来提供高可靠性,并经过分区机制在软件层面实现负载均衡,所以Kafka的磁盘存储能够不使用磁盘阵列(RAID),使用普通磁盘组成存储空间便可。
(2)使用机械磁盘可以胜任Kafka线上环境,但SSD显然性能更好。git
规划磁盘容量时须要考虑:新增消息数、消息留存时间、平均消息大小、备份数、是否启用压缩等因素。
假设公司业务天天须要向Kafka集群发送100000000条消息,每条消息保存两份以防止数据丢失,消息默认保存7天时间,消息的平均大小是1KB,Kafka的数据压缩比是0.75。
天天100000000条1KB大小的消息,保存两份,压缩比0.75,占用空间大小就等于150GB(100000000*1KB*2/1000/1000*0.75
),考虑到Kafka集群的索引数据等,须要预留出10%的磁盘空间,所以天天总存储容量是165GB。数据留存7天,所以规划磁盘容量为1155GB(165GB*7
)。github
假设公司的机房环境是千兆网络,即1Gbps,业务须要在1小时内处理1TB的业务数据。假设Kafka Broker会用到70%的带宽资源,超过70%的阈值可能网络丢包,单台Kafka Broker最多能使用大约700Mb的带宽资源,但一般须要再额外为其它服务预留出2/3的资源,即Kafka Broker能够为Kafka服务分配带宽240Mbps(700Mb/3)。1小时处理1TB数据,则每秒须要处理2336Mb(1024*1024*8/3600
)数据,除以240,约等于10台服务器。若是还须要额外复制两份,那么服务器台数还要乘以3,即30台。web
Broker端参数也被称为静态参数(Static Configs),静态参数只能在Kafka的配置文件server.properties中进行设置,必须重启Broker进程才能生效。
log.dirs:指定Broker须要使用的若干个文件目录路径,没有默认值,必须指定。在生产环境中必定要为log.dirs配置多个路径,若是条件容许,须要保证目录被挂载到不一样的物理磁盘上。优点在于,提高读写性能,多块物理磁盘同时读写数据具备更高的吞吐量;可以实现故障转移(Failover),Kafka 1.1版本引入Failover功能,坏掉磁盘上的数据会自动地转移到其它正常的磁盘上,并且Broker还能正常工做,基于Failover机制,Kafka能够舍弃RAID方案。
zookeeper.connect:CS格式参数,能够指定值为zk1:2181,zk2:2181,zk3:2181,不一样Kafka集群能够指定:zk1:2181,zk2:2181,zk3:2181/kafka1,chroot只须要写一次。
listeners:设置内网访问Kafka服务的监听器。
advertised.listeners:设置外网访问Kafka服务的监听器。
auto.create.topics.enable:是否容许自动建立Topic。
unclean.leader.election.enable:是否容许Unclean Leader 选举。
auto.leader.rebalance.enable:是否容许按期进行Leader选举,生产环境中建议设置成false。
log.retention.{hours|minutes|ms}:控制一条消息数据被保存多长时间。优先级:ms设置最高、minutes次之、hours最低。
log.retention.bytes:指定Broker为消息保存的总磁盘容量大小。message.max.bytes:控制Broker可以接收的最大消息大小。算法
若是同时设置了Topic级别参数和全局Broker参数,Topic级别参数会覆盖全局Broker参数,而每一个Topic都能设置本身的参数值。
生产环境中,应当容许不一样部门的Topic根据自身业务须要,设置本身的留存时间。若是只能设置全局Broker参数,那么势必要提取全部业务留存时间的最大值做为全局参数值,此时设置Topic级别参数对Broker参数进行覆盖就是一个不错的选择。
retention.ms:指定Topic消息被保存的时长,默认是7天,只保存最近7天的消息,会覆盖掉Broker端的全局参数值。
retention.bytes:指定为Topic预留多大的磁盘空间。一般在多租户的Kafka集群中使用,默认值是 -1,表示能够无限使用磁盘空间。
max.message.bytes:指定Kafka Broker可以正常接收Topic 的最大消息大小。
Topic级别参数能够在建立Topic时进行设置,也能够在修改Topic 时设置,推荐在修改Topic时进行设置,Apache Kafka社区将来可能统一使用kafka-configs脚原本设置Topic级别参数。docker
Kafka 2.0.0版本已经正式摒弃对Java 7的支持。
Kafka Broker在与客户端进行交互时会在JVM堆上建立大量的Byte Buffer实例,所以JVM端设置的Heap Size不能过小,建议设置6GB。export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
JVM端配置的一个重要参数是垃圾回收器的设置。对于Java 7,若是Broker所在机器的CPU资源很是充裕,建议使用CMS收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。不然,使用吞吐量收集器,开启方法是指定-XX:+UseParallelGC。对于Java 9,用默认的G1收集器,在没有任何调优的状况下,G1表现得要比CMS出色,主要体如今更少的Full GC,须要调整的参数更少等,因此使用G1就好。export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
bootstrap
件描述符限制:ulimit -n。建议设置成一个超大的值,如ulimit -n 1000000。
文件系统类型:文件系统类型的选择。根据官网的测试报告,XFS 的性能要强于ext4。
Swappiness:推荐设置为一个较小值,如1。若是将swap设置为0,将会彻底禁止Kafka Broker进程使用swap空间;当物理内存耗尽时,操做系统会触发OOM killer组件,随机挑选一个进程kill掉,不给用户任何预警。若是设置一个比较小值,当开始使用swap空间时,Broker性能会出现急剧降低,从而给进一步调优和诊断问题的时间。
提交时间:提交时间(Flush落盘时间)。向Kafka发送数据并非真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操做系统的页缓存(Page Cache)上就认为写入成功,随后操做系统根据LRU算法会按期将页缓存上的脏数据落盘到物理磁盘上。页缓存数据写入磁盘的周期由提交时间来肯定,默认是5秒,能够适当地增长提交间隔来下降物理磁盘的写操做。若是在页缓存中的数据在写入到磁盘前机器宕机,数据会丢失,但鉴于Kafka在软件层面已经提供了多副本的冗余机制,拉大提交间隔换取性能是一个合理的作法。缓存
安装Docker:sudo yum install docker
启动Docker:sudo systemctl start docker
docker版本检查:docker version
安全
docker-compose下载:sudo curl -L https://github.com/docker/compose/releases/download/1.23.0-rc3/docker-compose-
uname -s-
uname -m-o /usr/local/bin/docker-compose
docker-compose安装:sudo chmod +x /usr/local/bin/docker-compose
docker-compose版本检查:docker-compose version
bash
zookeeper镜像选择:docker search zookeeper
选择star最多的镜像:docker.io/zookeeper
Kafka镜像选择:docker search kafka
选择star最多的镜像:docker.io/wurstmeister/kafka
kafka-manager镜像选择:docker search kafka-manager
选择镜像:kafkamanager/kafka-manager
# 单机 zookeeper + kafka + kafka-manager集群 version: '2' services: # 定义zookeeper服务 zookeeper-test: image: zookeeper # zookeeper镜像 restart: always hostname: zookeeper-test ports: - "12181:2181" # 宿主机端口:docker内部端口 container_name: zookeeper-test # 容器名称 # 定义kafka服务 kafka-test: image: wurstmeister/kafka # kafka镜像 restart: always hostname: kafka-test ports: - "9092:9092" # 对外暴露端口号 - "9999:9999" # 对外暴露JMX_PORT environment: KAFKA_ADVERTISED_HOST_NAME: 192.168.0.105 # KAFKA_ADVERTISED_PORT: 9092 # KAFKA_ZOOKEEPER_CONNECT: zookeeper-test:2181 # zookeeper服务 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 # zookeeper链接超时 KAFKA_LOG_CLEANUP_POLICY: "delete" KAFKA_LOG_RETENTION_HOURS: 120 # 设置消息数据保存的最长时间为120小时 KAFKA_MESSAGE_MAX_BYTES: 10000000 # 消息体的最大字节数 KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 # KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000 # KAFKA_NUM_PARTITIONS: 1 # 分区数量 KAFKA_DELETE_RETENTION_MS: 10000 # KAFKA_BROKER_ID: 1 # kafka的ID KAFKA_COMPRESSION_TYPE: lz4 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.0.105 -Dcom.sun.management.jmxremote.rmi.port=9999" # 导入KAFKA_JMX_OPTS环境变量 JMX_PORT: 9999 # 导入JMX_PORT环境变量 depends_on: - zookeeper-test # 依赖 container_name: kafka-test # 定义kafka-manager服务 kafka-manager-test: image: kafkamanager/kafka-manager # kafka-manager镜像 restart: always container_name: kafka-manager-test hostname: kafka-manager-test ports: - "9000:9000" # 对外暴露端口,提供web访问 depends_on: - kafka-test # 依赖 environment: ZK_HOSTS: zookeeper-test:2181 # 宿主机IP KAFKA_BROKERS: kafka-test:9090 # kafka KAFKA_MANAGER_AUTH_ENABLED: "true" # 开启安全认证 KAFKA_MANAGER_USERNAME: kafka-manager # Kafka Manager登陆用户 KAFKA_MANAGER_PASSWORD: 123456 # Kafka Manager登陆密码
须要确认相应端口是否被占用。
建立kafka目录,将docker-compose.yml文件放入kafka目录,在kafka目录执行命令。
启动:docker-compose up -d
关闭:docker-compose down
进入docker容器:docker exec -it kafka /bin/bash
建立Topic:kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic test
查看Topic:kafka-topics.sh --list --zookeeper zookeeper:2181
生产消息:kafka-console-producer.sh --broker-list kafka:9092 --topic test
消费消息:kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning
打开两个Terminal,一个执行生产消息的命令,一个执行消费消息的命令,每生产一条消息时消费消息Terminal就会显示一条消息,实现消息队列。
wurstmeister/kafka镜像中,kafka安装在/opt目录下,进入/opt目录,kafka_2.12-2.4.0目录即为kafka安装目录。
Scala版本:2.12
Kafka版本:2.4
Web方式访问:http://127.0.0.1:9000
docker rm -f $(docker ps -a --filter status=dead -q |head -n 1)
报错信息:ERROR: for f78856fb92e9_zoo1 Driver overlay2 failed to remove root filesystem f78856fb92e97f75ff4c255077de544b39351a4a2a3319737ada2a54df568032: remove /var/lib/docker/overlay2/2c257b8071b6a3d79e216838522f76ba7263d466a470dc92cdbef25c4dd04dc3/merged: device or resource busy
grep docker /proc/*/mountinfo|grep containerid | awk -F ":" '{print $1}' | awk -F "/" '{print $3}'
sudo kill -9 3119
报错信息:Error response from daemon: Container 9b3f9af8a1196f2ad3cf74fe2b1eeb7ccbd231fe2a93ec09f594d3a0fbb5783c is restarting, wait until the container is running
错误缘由:
docker-compose.yml文件对kafka服务配置restart: always,若是kafka服务启动失败会一直重启,能够经过docker logs kafka查看kafka服务启动的日志信息,查找错误缘由。
############################# System ###################### # 惟一标识在集群中的ID,要求是正数。 broker.id = 0 # 服务端口,默认9092 port = 9092 # 监听地址,不设为全部地址 host.name = debugo01 # 处理网络请求的最大线程数 num.network.threads = 2 # 处理磁盘I/O的线程数 num.io.threads = 8 # 一些后台线程数 background.threads = 4 # 等待IO线程处理的请求队列最大数 queued.max.requests = 500 # socket的发送缓冲区(SO_SNDBUF) socket.send.buffer.bytes = 1048576 # socket的接收缓冲区 (SO_RCVBUF) socket.receive.buffer.bytes = 1048576 # socket请求的最大字节数。为了防止内存溢出,message.max.bytes必然要小于 socket.request.max.bytes = 104857600 ############################# Topic ######################## # 每一个topic的分区个数,更多的partition会产生更多的segment file num.partitions = 2 # 是否容许自动建立topic ,如果false,就须要经过命令建立topic auto.create.topics.enable = true # 一个topic ,默认分区的replication个数 ,不能大于集群中broker的个数。 default.replication.factor = 1 # 消息体的最大大小,单位是字节 message.max.bytes = 1000000 ############################# ZooKeeper #################### # Zookeeper quorum设置。若是有多个使用逗号分割 zookeeper.connect = debugo01:2181, debugo02, debugo03 # 链接zk的超时时间 zookeeper.connection.timeout.ms = 1000000 # ZooKeeper集群中leader和follower之间的同步实际 zookeeper.sync.time.ms = 2000 ############################# Log ######################### # 日志存放目录,多个目录使用逗号分割 log.dirs = / var / log / kafka # 当达到下面的消息数量时,会将数据flush到日志文件中。默认10000 # log.flush.interval.messages=10000 # 当达到下面的时间(ms)时,执行一次强制的flush操做。interval.ms和interval.messages不管哪一个达到,都会flush。默认3000ms # log.flush.interval.ms=1000 # 检查是否须要将日志flush的时间间隔 log.flush.scheduler.interval.ms = 3000 # 日志清理策略(delete|compact) log.cleanup.policy = delete # 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes不管哪一个先达到都会触发。 log.retention.hours = 168 # 日志数据存储的最大字节数。超过这个时间会根据policy处理数据。 # log.retention.bytes=1073741824 # 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制) log.segment.bytes = 536870912 # 当达到下面时间,会强制新建一个segment log.roll.hours = 24 * 7 # 日志片断文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes) log.retention.check.interval.ms = 60000 # 是否开启压缩 log.cleaner.enable = false # 对于压缩的日志保留的最长时间 log.cleaner.delete.retention.ms = 1 day # 对于segment日志的索引文件大小限制 log.index.size.max.bytes = 10 * 1024 * 1024 # y索引计算的一个缓冲区,通常不须要设置。 log.index.interval.bytes = 4096 ############################# replica ####################### # partition management controller 与replicas之间通信的超时时间 controller.socket.timeout.ms = 30000 # controller-to-broker-channels消息队列的尺寸大小 controller.message.queue.size = 10 # replicas响应leader的最长等待时间,如果超过这个时间,就将replicas排除在管理以外 replica.lag.time.max.ms = 10000 # 是否容许控制器关闭broker ,如果设置为true,会关闭全部在这个broker上的leader,并转移到其余broker controlled.shutdown.enable = false # 控制器关闭的尝试次数 controlled.shutdown.max.retries = 3 # 每次关闭尝试的时间间隔 controlled.shutdown.retry.backoff.ms = 5000 # 若是relicas落后太多,将会认为此partition relicas已经失效。而通常状况下,由于网络延迟等缘由,总会致使replicas中消息同步滞后。若是消息严重滞后,leader将认为此relicas网络延迟较大或者消息吞吐能力有限。在broker数量较少,或者网络不足的环境中,建议提升此值. replica.lag.max.messages = 4000 # leader与relicas的socket超时时间 replica.socket.timeout.ms = 30 * 1000 # leader复制的socket缓存大小 replica.socket.receive.buffer.bytes = 64 * 1024 # replicas每次获取数据的最大字节数 replica.fetch.max.bytes = 1024 * 1024 # replicas同leader之间通讯的最大等待时间,失败了会重试 replica.fetch.wait.max.ms = 500 # 每个fetch操做的最小数据尺寸,若是leader中还没有同步的数据不足此值,将会等待直到数据达到这个大小 replica.fetch.min.bytes = 1 # leader中进行复制的线程数,增大这个数值会增长relipca的IO num.replica.fetchers = 1 # 每一个replica将最高水位进行flush的时间间隔 replica.high.watermark.checkpoint.interval.ms = 5000 # 是否自动平衡broker之间的分配策略 auto.leader.rebalance.enable = false # leader的不平衡比例,如果超过这个数值,会对分区进行从新的平衡 leader.imbalance.per.broker.percentage = 10 # 检查leader是否不平衡的时间间隔 leader.imbalance.check.interval.seconds = 300 # 客户端保留offset信息的最大空间大小 offset.metadata.max.bytes = 1024 #############################Consumer ##################### # Consumer端核心的配置是group.id、zookeeper.connect # 决定该Consumer归属的惟一组ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group. group.id # 消费者的ID,如果没有设置的话,会自增 consumer.id # 一个用于跟踪调查的ID ,最好同group.id相同 client.id = < group_id > # 对于zookeeper集群的指定,必须和broker使用一样的zk配置 zookeeper.connect = debugo01:2182, debugo02: 2182, debugo03: 2182 # zookeeper的心跳超时时间,查过这个时间就认为是无效的消费者 zookeeper.session.timeout.ms = 6000 # zookeeper的等待链接时间 zookeeper.connection.timeout.ms = 6000 # zookeeper的follower同leader的同步时间 zookeeper.sync.time.ms = 2000 # 当zookeeper中没有初始的offset时,或者超出offset上限时的处理方式 。 # smallest :重置为最小值 # largest:重置为最大值 # anything else:抛出异常给consumer auto.offset.reset = largest # socket的超时时间,实际的超时时间为max.fetch.wait + socket.timeout.ms. socket.timeout.ms = 30 * 1000 # socket的接收缓存空间大小 socket.receive.buffer.bytes = 64 * 1024 # 从每一个分区fetch的消息大小限制 fetch.message.max.bytes = 1024 * 1024 # true时,Consumer会在消费消息后将offset同步到zookeeper,这样当Consumer失败后,新的consumer就能从zookeeper获取最新的offset auto.commit.enable = true # 自动提交的时间间隔 auto.commit.interval.ms = 60 * 1000 # 用于消费的最大数量的消息块缓冲大小,每一个块能够等同于fetch.message.max.bytes中数值 queued.max.message.chunks = 10 # 当有新的consumer加入到group时,将尝试reblance,将partitions的消费端迁移到新的consumer中, 该设置是尝试的次数 rebalance.max.retries = 4 # 每次reblance的时间间隔 rebalance.backoff.ms = 2000 # 每次从新选举leader的时间 refresh.leader.backoff.ms # server发送到消费端的最小数据,如果不知足这个数值则会等待直到知足指定大小。默认为1表示当即接收。 fetch.min.bytes = 1 # 如果不知足fetch.min.bytes时,等待消费端请求的最长等待时间 fetch.wait.max.ms = 100 # 若是指定时间内没有新消息可用于消费,就抛出异常,默认-1表示不受限 consumer.timeout.ms = -1 #############################Producer###################### # 核心的配置包括: # metadata.broker.list # request.required.acks # producer.type # serializer.class # 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也能够在外面设置一个vip metadata.broker.list # 消息的确认模式 # 0:不保证消息的到达确认,只管发送,低延迟可是会出现消息的丢失,在某个server失败的状况下,有点像TCP # 1:发送消息,并会等待leader 收到确认后,必定的可靠性 # -1:发送消息,等待leader收到确认,并进行复制操做后,才返回,最高的可靠性 request.required.acks = 0 # 消息发送的最长等待时间 request.timeout.ms = 10000 # socket的缓存大小 send.buffer.bytes = 100 * 1024 # key的序列化方式,如果没有设置,同serializer.class key.serializer.class # 分区的策略,默认是取模 partitioner.class =kafka.producer.DefaultPartitioner # 消息的压缩模式,默认是none,能够有gzip和snappy compression.codec = none # 能够针对默写特定的topic进行压缩 compressed.topics = null # 消息发送失败后的重试次数 message.send.max.retries = 3 # 每次失败后的间隔时间 retry.backoff.ms = 100 # 生产者定时更新topic元信息的时间间隔 ,如果设置为0,那么会在每一个消息发送后都去更新数据 topic.metadata.refresh.interval.ms = 600 * 1000 # 用户随意指定,可是不能重复,主要用于跟踪记录消息 client.id = "" # 异步模式下缓冲数据的最大时间。例如设置为100则会集合100ms内的消息后发送,这样会提升吞吐量,可是会增长消息发送的延时 queue.buffering.max.ms = 5000 # 异步模式下缓冲的最大消息数,同上 queue.buffering.max.messages = 10000 # 异步模式下,消息进入队列的等待时间。如果设置为0,则消息不等待,若是进入不了队列,则直接被抛弃 queue.enqueue.timeout.ms = -1 # 异步模式下,每次发送的消息数,当queue.buffering.max.messages或queue.buffering.max.ms知足条件之一时producer会触发发送。 batch.num.messages = 200