kafka学习笔记-生产者(二)
三、发送消息到Kafka
- 最简单的方式
- 一、生产者的send()方法将ProducerRecord对象做为参数,因此药建立一个ProducerRecord对象。
- 二、咱们使用生产者的send()方法发送ProducerRecord对象。
- 从生产者的架构图里面能够看到,消息先试被放进缓冲区,而后使用单独的线程发送到服务端。
- send()方法会返回一个包含RecordMetadata的Future对象,不过由于咱们会忽略返回值,因此没法知道消息是否发送成功
- 若是不关心发送结果,那么可使用这种发送方式。
- 三、其余异常
- 咱们能够忽略发送消息时可能发生的错误或在服务器端可能发生的错误,可是在发送消息以前,生产者我仍是有可能发生其余异常。
- 这些异常有多是SerializationException(序列化失败)、BufferExhaustedException或TimeoutException(说明缓冲区已经满了)又或者是InterruptException(说明线程被中断)
- 同步消息发送
- 一、Producer.send()
- 方法返回一个Future对象,而后调用Future对象的get()方法等待Kafka响应。
- 若是服务器返回作粗,get()方法会跑出异常,
- 若是没有发生错误,咱们会获得一个RecordMetaData对象,能够用它获取消息的偏移量。
- 二、若是在发送数据以前回或者在发送过程当中发生任何需错误,记录下来
- 可重试错误:链接错误:能够经过再次连接来解决。“无主”错误:能够经过从新为分区选举首领来解决。若是屡次重试任然没法解决,应用程序会收到一个重试异常。
- 没法经过重试解决:消息太大:对于这类错误,KafkaProducer不会进行任何重试,直接抛出异常。
- 异步发送消息
- 若是kafka集群之间一个来回须要10ms,若是在发送完每一个消息来回须要10ms.若是在发送完每一个消息以后都等待回应,那么发送一百个消息就须要1秒。但若是只发送消息不等待响应,那么发送100个消息。所须要的时间会少不少
- 大多数的状况下咱们并不须要等待响应。
- 一、为了使用回调,须要一个实现了org.apache.kafka.clients.producer.Callback接口的类,这个类只有一个onCompletion方法。
- 二、若是kafka返回一个错误,onCompletion方法会抛出一个非空异常。这里咱们只是简单的打印出来,可是生产环境应该有更好的处理方式。
- 三、记录与以前的同样
- 四、在发送消息时穿进去一个回调对象。
四、生产者配置
- acks:指定了必须有多少个分区副本接收到消息,生产者才会认为是消息写入成功了。这个参数对于消息丢失有重要的影响。
- acks=0:生产者在成功写入消息以前不会等待任何来自服务器的响应。肯能会致使数据丢失,可是吞吐量高
- acks=1:只要首节点收到消息,生产者就会收到一个来自服务器的成功响应。若是一个没有收到消息的节点成为新首领会致使消息丢失。这时候的吞吐量取决是同步发送仍是异步发送分。
- ack=all:只有当全部参与复制的节点所有收到消息时,生产者才会收到一个来自服务器的响应。这种模式是安全的名单时延迟高。
- buffer.memory 该参数用来设置生产者内存缓冲区的大小,生产者用它缓存要发送到服务器的消息
- 若是应用程序发送消息的速度超过发送到服务器的速度,会致使生产者空间不足
- 这个时候send()方法调用要么被阻塞,要么抛异常,取决于如何设置block.on.buffer.full参数。
- compression.type 它指定了消息被发送给broker以前使用哪种算法进行压缩,默认不压缩,该参数能够设置为snappy, gzip或lz4。
- snappy压缩算法是谷歌发明的,它占用较少的CPU,提供较好的性能和至关可观的压缩比,若是比较关注性能和网络宽带,可使用这种算法。
- gzip通常使用较多的CPU,可是会提供更高的压缩比,因此若是网络宽带有限,可使用这种算法。
- 使用压缩能够下降网络传输开销和存储开销,而这每每是kafka发送消息的瓶颈所在。
- retires:生产者收到的服务器返回的错误多是临时的(好比分区找不到首领),在这种状况下,这个参数决定了重发的次数。
- 默认状况下,生产者每次重试之间会等待100ms。能够经过retry.backoff.ms参数来改变时间间隔。
- 通常状况下,由于生产者会自动进行重试,因此就不必在代码逻辑处理那些可重试错误,你只须要处理那些不可重试的错误,或者重试次数超出上限的状况。
- batch.size 当多个消息须要被发送到同一个分区的时候,生产者会把他们放在同一个批次里。该参数指定了一个批次能够用的内存大小,按照字节计算(而不是消息个数)
- 当批次被填满,批次里面的全部消息会被发送出去。
- 生产者不必定会等到批次被占满才发送数据。
- 咱们通常把这个之设置的很大,也不会形成延迟,只是会多占用一些内存。
- 若是设置的过小,生产者须要更频繁的发送消息,会增长一些额外一些开销。
- linger.ms 该参数指定了生产者在发送批次以前等待更多消息加入批次的时间。
- kafkaProducer会在批次填满或者linger.ms达到上限时把批次发出去。
- 把linger.ms设置成比0大的数,让生产者在发送以前等待一下子,使更多的消息加入这个批次,虽然这样会增长延迟,可是也会提升吞吐量(由于一次性发送更多的消息,每一个消息的开销就变小了)
- client.id 消息标识,该参数能够是任何的字符串,服务器用它来识别消息来源,还能够用在日志和配额指标里面。
- max.in.flight.requests.per.connection: 指定了生产者在收到服务器响应以前能够发送多个消息。
- 值越大,占用的内存越大
- 值越大,吞吐量越高
- 设置为1,保证消息顺序发送,即便发生了重试。
- timeout.ms、request.timeout.ms、metadata.fetch.timeout.ms
- timeout.ms 指定了broker等待同步副本返回消息确认的时间,与acks的配置相匹配
- 若是指定时间没收到同步副本确认,broker返回一个错误
- request.timeout.ms 生产者在发送数据的时候等待服务器返回响应的时间。
- metadata.fetch.timeout.ms 生产者在获取元数据(好比目标分区的首领是谁)是等待服务器返回响应的时间。
- max.block.ms 指定了在调用send()方法或者使用partitionFor()方法获取元数据时生产者阻塞时间。
- 当生产者的发送缓冲区已满,或没有可用元数据,这些方法就会阻塞
- 在阻塞时间达到配置的值时,生产者就会跑出异常
- max.request.size 用于控制生产者发送的请求大小。能够指发送消息的单个大小,也能够指单个请求里全部消息的大小。
- receive.buffer.bytes、send.buffer.bytes 分别指定了Tcp socket接收和发送数据的缓冲区大小。
- -1,就是使用系统的默认值
- 若是生产者或消费者与broker处于不一样的数据中心,那么能够适当增大这个值。
欢迎关注本站公众号,获取更多信息