Kafka学习笔记-Java简单操做

Maven依赖包:java

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.8.2.1</version>
</dependency>
		
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.11</artifactId>
	<version>0.8.2.1</version>
</dependency>

代码以下:

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProducerTest {
	
	private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerTest.class);
	
	private static Properties properties = null;
	
	static {
		properties = new Properties();
		properties.put("bootstrap.servers", "centos.master:9092,centos.slave1:9092,centos.slave2:9092");
		properties.put("producer.type", "sync");
		properties.put("request.required.acks", "1");
		properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
		properties.put("partitioner.class", "kafka.producer.DefaultPartitioner");
		properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
//		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
//		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
	}
	
	public void produce() {
		KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[],byte[]>(properties);
		ProducerRecord<byte[],byte[]> kafkaRecord = new ProducerRecord<byte[],byte[]>(
				"test", "kkk".getBytes(), "vvv".getBytes());
		kafkaProducer.send(kafkaRecord, new Callback() {
			public void onCompletion(RecordMetadata metadata, Exception e) {
				if(null != e) {
					LOG.info("the offset of the send record is {}", metadata.offset());
					LOG.error(e.getMessage(), e);
				}
				LOG.info("complete!");
			}
		});
		kafkaProducer.close();
	}

	public static void main(String[] args) {
		KafkaProducerTest kafkaProducerTest = new KafkaProducerTest();
		for (int i = 0; i < 10; i++) {
			kafkaProducerTest.produce();
		}
	}
}

import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerTest {
	
	private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerTest.class);
	
	public static void main(String[] args) {
		Properties properties = new Properties();
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
				"centos.master:9092,centos.slave1:9092,centos.slave2:9092");
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");            
		properties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "1000");            
		properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
		properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
//		properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");
		properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000");  
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
				"org.apache.kafka.common.serialization.ByteArrayDeserializer");
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
				"org.apache.kafka.common.serialization.ByteArrayDeserializer");
		
		KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(properties);
		kafkaConsumer.subscribe("test");
//		kafkaConsumer.subscribe("*");
		boolean isRunning = true;            
		while(isRunning) {
			Map<String, ConsumerRecords<byte[], byte[]>> results = kafkaConsumer.poll(100);
			if (null != results) {
				for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> entry : results.entrySet()) {
					LOG.info("topic {}", entry.getKey());
					ConsumerRecords<byte[], byte[]> consumerRecords = entry.getValue();
					List<ConsumerRecord<byte[], byte[]>> records = consumerRecords.records();
					for (int i = 0, len = records.size(); i < len; i++) {
						ConsumerRecord<byte[], byte[]> consumerRecord = records.get(i);
						LOG.info("topic {} partition {}", consumerRecord.topic(), consumerRecord.partition());
						try {
							LOG.info("offset {} value {}", consumerRecord.offset(), new String(consumerRecord.value()));
						} catch (Exception e) {
							LOG.error(e.getMessage(), e);
						}
					}
				}
			}
		}
		
		kafkaConsumer.close();  
		
	}

}


发现KafkaConsumer的poll方法未实现apache

@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
     // TODO Auto-generated method stub
     return null;
}

后改成kafka.javaapi.consumer.SimpleConsumer实现,正常运行
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class KafkaSimpleConsumerTest {
	
	private List<String> borkerList = new ArrayList<String>();  
	  
    public KafkaSimpleConsumerTest() {  
        borkerList = new ArrayList<String>();  
    }  
  
    public static void main(String args[]) {  
        KafkaSimpleConsumerTest kafkaSimpleConsumer = new KafkaSimpleConsumerTest();  
        // 最大读取消息数量  
        long maxReadNum = Long.parseLong("3");  
        // 订阅的topic  
        String topic = "test";  
        // 查找的分区  
        int partition = Integer.parseInt("0");  
        // broker节点
        List<String> seeds = new ArrayList<String>();  
        seeds.add("centos.master");  
        seeds.add("centos.slave1");  
        seeds.add("centos.slave2");  
        // 端口  
        int port = Integer.parseInt("9092");  
        try {  
            kafkaSimpleConsumer.run(maxReadNum, topic, partition, seeds, port);  
        } catch (Exception e) {  
            System.out.println("Oops:" + e);  
            e.printStackTrace();  
        }  
    }  
  
    public void run(long maxReadNum, String topic, int partition, List<String> seedBrokers, int port) throws Exception {  
        // 获取指定topic partition的元数据  
        PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);  
        if (metadata == null) {  
            System.out.println("can't find metadata for topic and partition. exit");  
            return;  
        }  
        if (metadata.leader() == null) {  
            System.out.println("can't find leader for topic and partition. exit");  
            return;  
        }  
        String leadBroker = metadata.leader().host();  
        String clientName = "client_" + topic + "_" + partition;  
  
        SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);  
        long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);  
        int numErrors = 0;  
        while (maxReadNum > 0) {  
            if (consumer == null) {  
                consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);  
            }  
            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partition, readOffset, 100000).build();  
            FetchResponse fetchResponse = consumer.fetch(req);  
  
            if (fetchResponse.hasError()) {  
                numErrors++;  
                short code = fetchResponse.errorCode(topic, partition);  
                System.out.println("error fetching data from the broker:" + leadBroker + " reason: " + code);  
                if (numErrors > 5)  
                    break;  
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {  
                    readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);  
                    continue;  
                }  
                consumer.close();  
                consumer = null;  
                leadBroker = findNewLeader(leadBroker, topic, partition, port);  
                continue;  
            }  
            numErrors = 0;  
  
            long numRead = 0;  
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {  
                long currentOffset = messageAndOffset.offset();  
                if (currentOffset < readOffset) {  
                    System.out.println("found an old offset: " + currentOffset + " expecting: " + readOffset);  
                    continue;  
                }  
  
                readOffset = messageAndOffset.nextOffset();  
                ByteBuffer payload = messageAndOffset.message().payload();  
  
                byte[] bytes = new byte[payload.limit()];  
                payload.get(bytes);  
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));  
                numRead++;  
                maxReadNum--;  
            }  
  
            if (numRead == 0) {  
                try {  
                    Thread.sleep(1000);  
                } catch (InterruptedException ie) {  
                }  
            }  
        }  
        if (consumer != null)  
            consumer.close();  
    }  
   
    /**
     * 从活跃的Broker列表中找出指定Topic、Partition中的Leader Broker
     * @param seedBrokers
     * @param port
     * @param topic
     * @param partition
     * @return
     */
    private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) {  
        PartitionMetadata partitionMetadata = null;  
        loop: for (String seedBroker : seedBrokers) {  
            SimpleConsumer consumer = null;  
            try {  
                consumer = new SimpleConsumer(seedBroker, port, 100000, 64 * 1024, "leaderLookup");  
                List<String> topics = Collections.singletonList(topic);  
                TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);  
                TopicMetadataResponse topicMetadataResponse = consumer.send(topicMetadataRequest);  
  
                List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();  
                for (TopicMetadata topicMetadata : topicMetadatas) {  
                    for (PartitionMetadata pMetadata : topicMetadata.partitionsMetadata()) {  
                        if (pMetadata.partitionId() == partition) {  
                            partitionMetadata = pMetadata;  
                            break loop;  
                        }  
                    }  
                }  
            } catch (Exception e) {  
                System.out.println("error communicating with broker [" + seedBroker + "] to find leader for [" + topic + ", " + partition + "] reason: " + e);  
            } finally {  
                if (consumer != null)  
                    consumer.close();  
            }  
        }  
        if (partitionMetadata != null) {  
            borkerList.clear();  
            for (Broker replica : partitionMetadata.replicas()) {  
                borkerList.add(replica.host());  
            }  
        }  
        return partitionMetadata;  
    }  
  
    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {  
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);  
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();  
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));  
        OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);  
        OffsetResponse response = consumer.getOffsetsBefore(request);  
        if (response.hasError()) {  
            System.out.println("error fetching data offset data the broker. reason: " + response.errorCode(topic, partition));  
            return 0;  
        }  
        long[] offsets = response.offsets(topic, partition);  
        return offsets[0];  
    }  
  
    private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception {  
        for (int i = 0; i < 3; i++) {  
            boolean goToSleep = false;  
            PartitionMetadata metadata = findLeader(borkerList, port, topic, partition);  
            if (metadata == null) {  
                goToSleep = true;  
            } else if (metadata.leader() == null) {  
                goToSleep = true;  
            } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {  
                goToSleep = true;  
            } else {  
                return metadata.leader().host();  
            }  
            if (goToSleep) {  
                try {  
                    Thread.sleep(1000);  
                } catch (InterruptedException ie) {  
                }  
            }  
        }  
        System.out.println("unable to find new leader after broker failure. exit");  
        throw new Exception("unable to find new leader after broker failure. exit");  
    }  
  
}