Kafka 发布订阅实现

    依赖包:html

<dependency>
    <groupId>org.apache.kafka</groupId>

    <artifactId>kafka_2.10</artifactId>
    <version>0.9.0.0</version>
</dependency>

    貌似新版本0.9修改Consumer相关的api,和0.9之前的实现方式不一样,统一了consumer API的实现,详情可见http://kafka.apache.org/documentation.html#consumerapigit

    生产者的实现:github

public class SimpleProducer {

    public void send() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.52.128:9092");
        //每次请求成功提交,保证消息发送成功
        props.put("acks", "all");
        //重试次数
        props.put("retries", 1);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //消息发送到某个分区上,默认org.apache.kafka.clients.producer.internals.DefaultPartitioner
//        props.put("partitioner.class", "com.test.kafka.simple.SimplePartitioner");

        Producer<String, String> producer = new KafkaProducer(props);
        for(int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("mykafka", "mykafka" + Integer.toString(i), "hello kafka " + Integer.toString(i)));

        }
        producer.close();
    }

    public static void main(String[] args) {
        new SimpleProducer().send();
    }

}

    消费者实现:spring

public class SimpleConsumer {

    private static final Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);

    public void poll() {
        KafkaConsumer<String, String> consumer = null;
        try {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.52.128:9092");
            //设置按期提交offset,也能够手动调用KafkaConsumer.commitSync()方法提交
            props.put("enable.auto.commit", true);
            props.put("auto.commit.interval.ms", "5000");
            //心跳检测,检测session链接,间隔时间应该小于session-time-out,建议配置不大于1/3 session-time-out
            props.put("heartbeat.interval.ms", "5000");
            props.put("session.timeout.ms", "30000");
            props.put("group.id", "test-consumer-group");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<String, String>(props);

            Map<String, List<PartitionInfo>> topics = consumer.listTopics();
            consumer.subscribe(Arrays.asList("mykafka"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {

                    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                    System.out.println();
                }

            }
        } finally {
            consumer.close();
        }
    }

    public static void main(String[] args) {
        new SimpleConsumer().poll();
    }

}

    其中能够自定义本身的序列化的实现以及消息发送到partitioner的方式:apache

public class SimplePartitioner implements Partitioner {

    public SimplePartitioner (VerifiableProperties props) {

    }

    @Override
    public int partition(Object o, int i) {
        int partition = 0;
        String stringKey = (String) o;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
            partition = Integer.parseInt( stringKey.substring(offset+1)) % i;
        }
        return partition;
    }
}

 

    以上是基于官方demo的例子实现的;spring也提供了spring-integration-kafka对Kafka的集成,详情可参见https://github.com/spring-projects/spring-integration-kafkabootstrap