Kafka 经过 KafkaProducer 构造器初始化生产者客户端的配置。
经常使用的重要配置,详见官网。html
// 基础配置 Map<String, Object> configs = new HashMap<>(); // Kafka broker 集群 configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); // key 序列化 configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // value 序列化 configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
Kafka 提供了6种构造器来构造消息。apache
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers); public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value); public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers); public ProducerRecord(String topic, Integer partition, K key, V value); public ProducerRecord(String topic, K key, V value); public ProducerRecord(String topic, V value);
支持同步发送和异步发送消息。bootstrap
同步发送app
producer.send(record).get();
异步发送异步
producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { // 回调处理流程 } });