Apache Kafka® 是 一个分布式流处理平台apache
上面是官网的介绍,和通常的消息处理系统相比,不一样之处在于:编程
和其余的消息系统之间的对比:安全
对比指标 | kafka | activemq | rabbitmq | rocketmq |
---|---|---|---|---|
背景 | Kafka 是LinkedIn 开发的一个高性能、分布式的消息系统,普遍用于日志收集、流式数据处理、在线和离线消息分发等场景 | ActiveMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件, 为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通讯。 | RabbitMQ是一个由erlang开发的AMQP协议(Advanced Message Queue )的开源实现。 | RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,已经于2016年11月成为 Apache 孵化项目 |
开发语言 | Java、Scala | Java | Erlang | Java |
协议支持 | 本身实现的一套 | JMS协议 | AMQP | JMS、MQTT |
持久化 | 支持 | 支持 | 支持 | 支持 |
producer容错 | 在kafka中提供了acks配置选项, acks=0 生产者在成功写入悄息以前不会等待任何来自服务器的响应 acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应 acks=all 只有当全部参与复制的节点所有收到消息时,生产者才会收到一个来自服务器的成功响应,这种模式最安全 | 发送失败后便可重试 | 有ack模型。 ack模型可能重复消息 ,事务模型保证彻底一致 | 和kafka相似 |
吞吐量 | kafka具备高的吞吐量,内部采用消息的批量处理,zero-copy机制,数据的存储和获取是本地磁盘顺序批量操做,具备O(1)的复杂度,消息处理的效率很高 | rabbitMQ在吞吐量方面稍逊于kafka,他们的出发点不同,rabbitMQ支持对消息的可靠的传递,支持事务,不支持批量的操做;基于存储的可靠性的要求存储能够采用内存或者硬盘。 | kafka在topic数量很少的状况下吞吐量比rocketMq高,在topic数量多的状况下rocketMq比kafka高 | |
负载均衡 | kafka采用zookeeper对集群中的broker、consumer进行管理,能够注册topic到zookeeper上;经过zookeeper的协调机制,producer保存对应topic的broker信息,能够随机或者轮询发送到broker上;而且producer能够基于语义指定分片,消息发送到broker的某分片上 | rabbitMQ的负载均衡须要单独的loadbalancer进行支持 | NamerServer进行负载均衡 |
架构图:bash
public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
public Producer(String topic, Boolean isAsync) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
}
@Override
public void run() {
int messageNo = 1;
while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else { // Send synchronously
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
/**
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
* be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
* non-null.
*
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
* occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
}
}
}
}
复制代码
public class Consumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
public Consumer(String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
this.topic = topic;
}
@Override
public void run() {
while (true) {
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1).getSeconds());
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
}
}
复制代码
public class KafkaProperties {
public static final String TOPIC = "topic1";
public static final String KAFKA_SERVER_URL = "localhost";
public static final int KAFKA_SERVER_PORT = 9092;
public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
public static final int CONNECTION_TIMEOUT = 100000;
public static final String TOPIC2 = "topic2";
public static final String TOPIC3 = "topic3";
public static final String CLIENT_ID = "SimpleConsumerDemoClient";
private KafkaProperties() {}
}
复制代码
小尾巴走一波,欢迎关注个人公众号,不按期分享编程、投资、生活方面的感悟:)服务器