依赖包:html
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.0</version> </dependency>
貌似新版本0.9修改Consumer相关的api,和0.9之前的实现方式不一样,统一了consumer API的实现,详情可见http://kafka.apache.org/documentation.html#consumerapigit
生产者的实现:github
public class SimpleProducer { public void send() { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.52.128:9092"); //每次请求成功提交,保证消息发送成功 props.put("acks", "all"); //重试次数 props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //消息发送到某个分区上,默认org.apache.kafka.clients.producer.internals.DefaultPartitioner // props.put("partitioner.class", "com.test.kafka.simple.SimplePartitioner"); Producer<String, String> producer = new KafkaProducer(props); for(int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>("mykafka", "mykafka" + Integer.toString(i), "hello kafka " + Integer.toString(i))); } producer.close(); } public static void main(String[] args) { new SimpleProducer().send(); } }
消费者实现:spring
public class SimpleConsumer { private static final Logger logger = LoggerFactory.getLogger(SimpleConsumer.class); public void poll() { KafkaConsumer<String, String> consumer = null; try { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.52.128:9092"); //设置按期提交offset,也能够手动调用KafkaConsumer.commitSync()方法提交 props.put("enable.auto.commit", true); props.put("auto.commit.interval.ms", "5000"); //心跳检测,检测session链接,间隔时间应该小于session-time-out,建议配置不大于1/3 session-time-out props.put("heartbeat.interval.ms", "5000"); props.put("session.timeout.ms", "30000"); props.put("group.id", "test-consumer-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<String, String>(props); Map<String, List<PartitionInfo>> topics = consumer.listTopics(); consumer.subscribe(Arrays.asList("mykafka")); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); System.out.println(); } } } finally { consumer.close(); } } public static void main(String[] args) { new SimpleConsumer().poll(); } }
其中能够自定义本身的序列化的实现以及消息发送到partitioner的方式:apache
public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) { } @Override public int partition(Object o, int i) { int partition = 0; String stringKey = (String) o; int offset = stringKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt( stringKey.substring(offset+1)) % i; } return partition; } }
以上是基于官方demo的例子实现的;spring也提供了spring-integration-kafka对Kafka的集成,详情可参见https://github.com/spring-projects/spring-integration-kafkabootstrap