注:此文并不是官方文档的翻译java
kafka的producer默认是异步的方式,在大数据量的状况下可能会出现丢失数据的状况.可是同步的方式又比较低效,所以合理设置异步producer下的kafka参数既能够提升效率又能够不丢失数据.只是要对各参数有一个比较深刻的了解.下面是我总结的对于处理安全外几乎全部producer参数的理解:python
以python客户端为例子,java的参数名可能稍有不一样可是含义是同样的算法
例子: producer = KafkaProducer(bootstrap_servers='xxx1:9092,xxx2:9092', acks=1,retries =3, batch_size=524288, reconnect_backoff_max_ms=3000, buffer_memory=536870912 ) // producer默认是异步的 f = producer.send("topic_name","hello") //f.get(timeout=3) 若是加了get就变成了同步,也就是说要等待get到服务端返回的结果后再往下执行
格式为host[:port]例如localhost:9092,是kafka链接的broker地址列表,能够是多台,用逗号分隔bootstrap
客户端名称,用来追查日志的,默认是kafka-python-producer-# (#是个惟一编号)安全
key序列化函数. 默认值: None.服务器
值序列化函数默认值: None.网络
表明kafka收到消息的答复数,0就是不要答复,爱收到没收到.1就是有一个leader broker答复就行,all是全部broker都要收到才行app
0: Producer不等待kafka服务器的答复,消息马上发往socket buffer,这种方式不能保证kafka收到消息,设置成这个值的时候retries参数就失效了,由于producer不知道kafka收没收到消息,因此所谓的重试就没有意义了,发送返回值的offset全默认是-1.异步
1: 等待leader记录数据到broker本地log便可.不等待leader同步到其余followers,那么假如此时恰好leader收到消息并答复后,leader忽然挂了,其余fowller还没来得及复制消息呢,那么这条消息就会丢失了.socket
all:等待全部broker记录消息.保证消息不会丢失(只要从节点没全挂),这种方式是最高可用的 acks默认值是1.
发消息时候的压缩类型能够是gzip,snappy,lz4,None,压缩是针对batches的,因此batches的大小会影响压缩效率,大一点的压缩比例可能好些,要是过小的话压缩就没有意义了,好比你就发个几个字节的数据那压完没准更大了.至于何时启用压缩,要看应用场景,启用后producer会变慢,但网络传输带宽占用会减小,带宽紧缺建议开启压缩,带宽充足就不用开了 默认值:None
重试发送次数,有时候网络出现短暂的问题的时候,会自动重发消息,前面提到了这个值是须要在acks=1或all时候才有效.若是设置了该参数,可是setting max_in_flight_requests_per_connection没有设置为1的话,可能形成消息顺序的改变,由于若是2个batches发到同一个partition,可是第一个失败重发了,那么就会形成第二个batches跑到前面去了. Default: 0.
批处理消息字节数,发往broker的消息会包含多个batches,每一个分区对应一个batch,batch小了会减少响吞吐量,batch为0的话就禁用了batch发送,默认值:16384(16kb)
逗留时间,这个逗留指的是消息不当即发送,而是逗留这个时间后一块发送,这个设置是比较有用的,有时候消息产生的要比可以发送的要快,这个参数完美的实现了一我的工的延迟,使得大批量能够聚合到一个batch里一块发送.当batch慢了的话,会忽略这个参数当即发送,这个参数有点相似TCP协议中的Nagle算法. Default: 0.
分区函数,用来人工干预消息发到到哪一个分区,这个函数会在key serialization后调用,函数原型:partitioner(key_bytes, all_partitions, available_partitions), 默认是对key作murmur2算法的hash(跟java客户端算法相同),hash值相同的到一个分区,没有key的话就是随机分区.
当消息发送速度大于kafka服务器接收的速度,producer会阻塞max_block_ms,超时会报异常,buffer_memory用来保存等待发送的消息,默认33554432(32MB)
当buffer满了或者metadata获取不到(好比leader挂了),或者序列化没完成分区函数没计算完等等状况下的最大阻塞时间,默认60000ms (60秒)
消息的最大大小限制,也就是说send的消息大小不能超过这个限制,这个限制只改一个地方是不行的producer, broker, consumer都要改才行. Default: 1048576.(1MB)
metadata的刷新时间,每通过这个时间就刷新下metadata来发现新的分区或broker,无论有没有都会刷新下看看 Default: 300000(5分钟)
– 重试发送若是仍是错误要等待下次重试的时间,单位毫秒 Default: 100.
发送请求的超时时间 Default: 30000.(30秒)
TCP receiver buffer(SO_RCVBUF)大小,就是接收数据的缓冲区大小 默认:None(根据操做系统设置).Java Client默认是32768
TCP send buffer (SO_SNDBUF) 发送数据的缓冲区大小 默认:None(根据操做系统设置).Java Client默认是131072.
socket选项 默认: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
尝试从新链接broker的时间间隔 Default: 50.
reconnect_backoff_ms每次再次链接失败会以指数增加,增加到的最大限度就是这个参数,为了不链接风暴,链接重试的时间间隔会在一个范围内随机调整,上浮或下调20%,也就是说每次重连的时间间隔不必定就是这个值自己,而是上下浮动20%. 另外这里解释下链接风暴,当咱们的kafka集群出现问题后,全部的producer和consumer都会尝试重连,重连的间隔就会达到这个参数所设置的最大值,好比你们都是每秒尝试重连,这时候若是集群回复了,那么在同一秒可能就会有大量的链接打到kafka集群上,这就形成了链接风暴,可是若是随机上下浮动就可能把重连时间给错开,不会形成同事的大量链接 Default: 1000.
发送多少条消息后,接收服务端确认,好比设置为1,就是每发一条就要确认一条,设置为5就是,发送5条消息等待一次确认 ,若是大于1,好比5,这种状况下是会有一个顺序问题的,就是这5条消息其中的一条发送失败了,若是进行重试,那么重发的消息实际上是在下个批次的,这就会形成消息顺序的错乱,因此若是须要保证消息的顺序,建议设置此参数为1 Default: 5.