bootstrap.servers
该参数为 broker
地址,不须要所有都填,由于 kafka
会从当前 broker
中获取其余 broker
信息。不过为了某个 broker
挂掉,通常填多个 broker
地址java
key.serializer
消息 key
如何序列化apache
value.serializer
消息内容如何序列化bootstrap
示例代码ide
Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
在消息发送前,对消息进行处理,该动做发生在序列化器
、分区器
以前。编码
实现 org.apache.kafka.clients.producer.ProducerInterceptor
接口,便可自定义拦截器code
介绍一下接口定义的方法server
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
在消息发送以前,能够对消息进行处理接口
void onAcknowledgement(RecordMetadata metadata, Exception exception
消息被应答以前或者消息发送失败时被调用get
void close()
producer
被关闭时,会调用kafka
kafka 容许配置拦截器链,多个拦截器用 , 号隔开便可。
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TestProducerInterceptor.class.getName() + "," + TestProducerInterceptor2.class.getName());
序列化发生在分区器
以前
实现 org.apache.kafka.common.serialization.StringSerializer
接口便可自定义序列化
介绍一下接口定义的方法
void configure(Map<String, ?> configs, boolean isKey)
在 StringSerializer
实现中,用于设置编码
byte[] serialize(String topic, String data)
定义如何序列化
void close()
producer
关闭时,被调用
实现 org.apache.kafka.clients.producer.Partitioner
便可自定义分区器
kafka 可按 key 进行哈希(MurmurHash2),将消息发往同一个分区。若是未指定 key,那么将会把消息发往随机的一个分区。
介绍一下接口定义的方法
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
定义发往哪一个分区;具体的实现可参考DefaultPartitioner
void close()
producer
关闭时,被调用
kafka
一致,rocketMQ
容许生产者将消息发送到指定的 'partition' 中rocketMQ
没有 序列化器
的概念。消息内容由 rocketMQ
自行序列化rocketMQ
也没有提供相似 拦截器
概念rocketMQ
提供了 hock
以此在消息发送前,和消息发送后,对消息进行处理例如:
DefaultMQProducer producer = new DefaultMQProducer("default"); producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() { @Override public String hookName() { return null; } @Override public void sendMessageBefore(SendMessageContext context) { } @Override public void sendMessageAfter(SendMessageContext context) { } });