在团800运维工做总结之kafka集群平常工做经验总结

  1. 一些重要的原理java

    基本原理什么叫broker partition cg我就不在这里说了,说一些本身总结的原理python



    1.kafka有副本的概念,每一个副本都分在不一样的partition中,这中间分为leader和fllower正则表达式

    2.kafka消费端的程序必定要和partition数量一致,不能够多,会出现有些consumer获取算法

    不到数据的现象json

    3.producer原理网络

    producer经过zookeeper获取所链接的topic都在那些partiton中,每一个parition的leader是那app

    个,针对leader进行写操做,prodcer经过zookeeper的watch机制来记录以上的信息,pro负载均衡

    ducer为了节省网络的io,还会在本地先把消息buffer起来,并将他们批量发送到broker中dom

    4.consumer原理异步

    consumer向broker发送fetch请求,并告知获取的消息offset,在kafka中采用pull方式,消费端

    主动pull消息,优势:消费者能够控制消费的数量



2.kafka生产环境经常使用命令总结

  

1.模拟生产端,推送数据

./bin/kafka-console-producer.sh --broker-list 172.16.10.130:9092 --topic deal_exposure_origin


2.模拟消费端,消费数据

./bin/kafka-console-consumer.sh --zookeeper 1172.16.10.140:2181  --topic deal_exposure_origin


3.建立topic,topic partiton数量 副本数 数据过时时间

./kafka-topics.sh --zookeeper spark:2181 --create --topic deal_task_log  --partitions 15 --replication-factor 1 retention.ms 1296000000


3.kafka如何动态的添加副本

1.副本,kafka必定要设置副本,若是以后再加会因为涉及到数据的同步,会把集群的io提高上去


3.如何扩大副本

2.把全部topic的信息记录到json文件中,信息有topic名称,用了哪些partition,副本在那个partition,

并修改json数据,添加副本数

#!/usr/bin/python

from kazoo.client import KazooClient

import random

import json


zk = KazooClient(hosts='172.16.11.73:2181')

zk.start()

for i in zk.get_children('/brokers/topics'):

        b= zk.get('/brokers/topics/'+i)[0]

        a = eval(b)['partitions']

        list = []

        dict = {}

        for key,value in a.items():

                if len(value) == 1:

                        c = {}

                        c['topic'] = i.encode('utf-8')

                        c['partition'] = int(key)

                        list1 = []

                        for ii in range(0,3):

                                while True:

                                        if list1:

                                                pass

                                        else:

                                                for iii in value:

                                                        list1.append(iii)

                                        if len(list1) == 3:

                                                break

                                        num = random.randint(0,4)

                                        #print 'num='+str(num),'value='+str(value)

                                        if num not in list1:

                                                list1.append(num)

                        #print list1

                        c['replicas'] = list1

                        list.append(c)


        version = eval(b)['version']

        dict['version'] = version

        dict['partitions'] = list

        #jsondata = json.dumps(dict)

        json.dump(dict,open('/opt/json/'+i+'.json','w'))

3.加载json文件

/usr/local/kafka_2.9.2-0.8.1.1/bin/kafka-reassign-partitions.sh --zookeeper 192.168.5.159:2181 --reassignment-json-file /opt/test.json --execute


4.查看是否已经添加了副本

usr/local/kafka_2.9.2-0.8.1.1/bin/kafka-topics.sh --describe --zookeeper 192.168.5.159:2181 --topic testtest

Topic:testtest  PartitionCount:15       ReplicationFactor:2     Configs:

        Topic: testtest Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: testtest Partition: 1    Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: testtest Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: testtest Partition: 3    Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: testtest Partition: 4    Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: testtest Partition: 5    Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: testtest Partition: 6    Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: testtest Partition: 7    Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: testtest Partition: 8    Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: testtest Partition: 9    Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: testtest Partition: 10   Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: testtest Partition: 11   Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: testtest Partition: 12   Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: testtest Partition: 13   Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: testtest Partition: 14   Leader: 0       Replicas: 0,1   Isr: 0,1


4.kafka集群之间作数据同步

找一个broker节点进行同步

1.建立配置文件mirror_consumer.config

配置文件里写本地的kafka集群zookeeper

定义一个group用来去消费全部的topic,进行同步

zookeeper.connect=172.16.11.43:2181,172.16.11.46:2181,172.16.11.60:2181,172.16.11.67:2181,172.16.11.73:2181

group.id=backup-mirror-consumer-group


2.建立配置文件mirror_producer.config

zookeeper,kafka ip写对端集群的ip

zookeeper.connect=172.17.1.159:2181,172.17.1.160:2181

metadata.broker.list=172.17.1.159:9092,172.17.1.160:9092


3.同步命令

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config sourceClusterConsumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"


参数详解

1. 白名单(whitelist) 黑名单(blacklist)

mirror-maker接受精确指定同步topic的白名单和黑名单。使用java标准的正则表达式,为了方便,逗号(‘,’)被编译为java正则中的(‘|’)。

2. Producer timeout

为了支持高吞吐量,你最好使用异步的内置producer,并将内置producer设置为阻塞模式(queue.enqueueTimeout.ms=-1)。这样能够保证数据(messages)不会丢失。不然,异步producer默认的 enqueueTimeout是0,若是producer内部的队列满了,数据(messages)会被丢弃,并抛出QueueFullExceptions异常。而对于阻塞模式的producer,若是内部队列满了就会一直等待,从而有效的节制内置consumer的消费速度。你能够打开producer的的trace logging,随时查看内部队列剩余的量。若是producer的内部队列长时间处于满的状态,这说明对于mirror-maker来讲,将消息从新推到目标Kafka集群或者将消息写入磁盘是瓶颈。

对于kafka的producer同步异步的详细配置请参考$KAFKA_HOME/config/producer.properties文件。关注其中的producer.type和queue.enqueueTimeout.ms这两个字段。

3. Producer 重试次数(retries)

若是你在producer的配置中使用broker.list,你能够设置当发布数据失败时候的重试次数。retry参数只在使用broker.list的时候使用,由于在重试的时候会从新选择broker。

4. Producer 数量

经过设置—num.producers参数,可使用一个producer池来提升mirror maker的吞吐量。在接受数据(messages)的broker上的producer是只使用单个线程来处理的。就算你有多个消费流,吞吐量也会在producer处理请求的时候被限制。

5. 消费流(consumption streams)数量

使用—num.streams能够指定consumer的线程数。请注意,若是你启动多个mirror maker进程,你可能须要看看其在源Kafka集群partitions的分布状况。若是在每一个mirror maker进程上的消费流(consumption streams)数量太多,某些消费进程若是不拥有任何分区的消费权限会被置于空闲状态,主要缘由在于consumer的负载均衡算法。

6. 浅迭代(Shallow iteration)与producer压缩

咱们建议在mirror maker的consumer中开启浅迭代(shallow iteration)。意思就是mirror maker的consumer不对已经压缩的消息集(message-sets)进行解压,只是直接将获取到的消息集数据同步到producer中。

若是你开启浅迭代(shallow iteration),那么你必须关闭mirror maker中producer的压缩功能,不然消息集(message-sets)会被重复压缩。

7. Consumer 和 源Kafka集群(source cluster)的 socket buffer sizes

镜像常常用在跨集群场景中,你可能但愿经过一些配置选项来优化内部集群的通讯延迟和特定硬件性能瓶颈。通常来讲,你应该对mirror-maker中consumer的socket.buffersize 和源集群broker的socket.send.buffer设定一个高的值。此外,mirror-maker中消费者(consumer)的fetch.size应该设定比socket.buffersize更高的值。注意,套接字缓冲区大小(socket buffer size)是操做系统网络层的参数。若是你启用trace级别的日志,你能够检查实际接收的缓冲区大小(buffer size),以肯定是否调整操做系统的网络层。



4.如何检验MirrorMaker运行情况

Consumer offset checker工具能够用来检查镜像对源集群的消费进度。例如:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group KafkaMirror --zkconnect localhost:2181 --topic test-topic

KafkaMirror,topic1,0-0 (Group,Topic,BrokerId-PartitionId)

            Owner = KafkaMirror_jkoshy-ld-1320972386342-beb4bfc9-0

  Consumer offset = 561154288

                  = 561,154,288 (0.52G)

         Log size = 2231392259

                  = 2,231,392,259 (2.08G)

     Consumer lag = 1670237971

                  = 1,670,237,971 (1.56G)

BROKER INFO

0 -> 127.0.0.1:9092

注意,–zkconnect参数须要指定到源集群的Zookeeper。另外,若是指定topic没有指定,则打印当前消费者group下全部topic的信息。



5.kafka所使用的磁盘io太高解决方法

问题:kafka所用磁盘io太高

咱们生产平台有5台kafka机器,每台机器上分了2块磁盘作parition

最近发现kafka所使用的磁盘io很是高,影响到了生产端推送数据的性能

一开始觉得是因为一个推送日志的topic所致使的的,由于每秒推送数据大概在2w左右,

后来把此topic迁移到了其余的kafka集群中仍是未见效果

最终iotop发现实际上是因为zookeeper持久化的时候致使的

zookeeper持久化的时候也写到kafka所用到的磁盘中


经过此问题说明几点问题

1.kafka用zookeeper,和你们所熟悉的其余应用例如solrcloud codis otter不太同样

通常用zookeeper都是管理集群节点用,而kafka用zookeeper是核心,生产端和消费端都会去

连接zookeeper获取响应的信息

生产端经过连接zookeeper获取topic都用了那些parition,每一个parition的副本的leader是那个

消费端连接zookeeper获取offset,消费端消费都会操做对zookeeper的数据进行修改,对io的操做

很频繁


解决方法:

禁止zookeeper作持久化操做

配置文件中添加一行

forceSync=no

问题解决

相关文章
相关标签/搜索