Kafka生产者APi

kafka客户端发布record(消息)到kafka集群。

新的生产者是线程安全的,在线程之间共享单个生产者实例,一般单例比多个实例要快。html

一个简单的例子,使用producer发送一个有序的key/value(键值对),放到java的main方法里就能直接运行,java

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close(); 

生产者的缓冲空间池保留还没有发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。若是使用后不关闭生产者,则会泄露这些资源。算法

send()方法是异步的,添加消息到缓冲区等待发送,并当即返回。生产者将单个的消息批量在一块儿发送来提升效率。apache

ack是判别请求是否为完整的条件(就是是判断是否是成功发送了)。咱们指定了“all”将会阻塞消息,这种设置性能最低,可是是最可靠的。bootstrap

retries,若是请求失败,生产者会自动重试,咱们指定是0次,若是启用重试,则会有重复消息的可能性。api

producer(生产者)缓存每一个分区未发送消息。缓存的大小是经过 batch.size 配置指定的。值较大的话将会产生更大的批。并须要更多的内存(由于每一个“活跃”的分区都有1个缓冲区)。缓存

默认缓冲可当即发送,即使缓冲空间尚未满,可是,若是你想减小请求的数量,能够设置linger.ms大于0。这将指示生产者发送请求以前等待一段时间,但愿更多的消息填补到未满的批中。这相似于TCP的算法,例如上面的代码段,可能100条消息在一个请求发送,由于咱们设置了linger(逗留)时间为1毫秒,而后,若是咱们没有填满缓冲区,这个设置将增长1毫秒的延迟请求以等待更多的消息。须要注意的是,在高负载下,相近的时间通常也会组成批,即便是 linger.ms=0。在不处于高负载的状况下,若是设置比0大,以少许的延迟代价换取更少的,更有效的请求。安全

buffer.memory 控制生产者可用的缓存总量,若是消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其余发送调用将被阻塞,阻塞时间的阈值经过max.block.ms设定,以后它将抛出一个TimeoutException。服务器

key.serializervalue.serializer示例,将用户提供的key和value对象ProducerRecord转换成字节,你可使用附带的ByteArraySerializaerStringSerializer处理简单的string或byte类型。oracle

send()

public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback) 

异步发送一条消息到topic,并调用callback(当发送已确认)。

send是异步的,而且一旦消息被保存在等待发送的消息缓存中,此方法就当即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。

发送的结果是一个RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳。若是topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间(若是用户没有指定指定消息的时间戳)若是topic使用的是LogAppendTime,则追加消息时,时间戳是broker的本地时间。

因为send调用是异步的,它将为分配消息的此消息的RecordMetadata返回一个Future。若是future调用get(),则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。

若是要模拟一个简单的阻塞调用,你能够调用get()方法。

byte[] key = "key".getBytes(); byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value) producer.send(record).get(); 

彻底无阻塞的话,能够利用回调参数提供的请求完成时将调用的回调通知。

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) e.printStackTrace(); System.out.println("The offset of the record we just sent is: " + metadata.offset()); } }); 

发送到同一个分区的消息回调保证按必定的顺序执行,也就是说,在下面的例子中 callback1 保证执行 callback2 以前:

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1); producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2); 

注意:callback通常在生产者的I/O线程中执行,因此是至关的快的,不然将延迟其余的线程的消息发送。若是你须要执行阻塞或计算昂贵(消耗)的回调,建议在callback主体中使用本身的Executor来并行处理。

pecified by:
send in interface Producer<K,V> 
Parameters:

record - 发送的记录(消息)
callback - 用户提供的callback,服务器来调用这个callback来应答结果(null表示没有callback)。

Throws:

InterruptException - 若是线程在阻塞中断。SerializationException - 若是key或value不是给定有效配置的serializers。TimeoutException - 若是获取元数据或消息分配内存话费的时间超过max.block.ms。KafkaException - Kafka有关的错误(不属于公共API的异常)。

做者:無名 连接:http://orchome.com/303 来源:OrcHome 著做权归做者全部。商业转载请联系做者得到受权,非商业转载请注明出处。
相关文章
相关标签/搜索