java提供了方便的API进行kafka消息处理。简单总结一下:html
学习参考:http://www.itnose.net/st/6095038.htmljava
POM配置(关于LOG4J的配置参看 http://www.cnblogs.com/huayu0815/p/5341712.html)apache
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.0</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-access</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.7</version> </dependency> </dependencies>
PRODUCERapi
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; public class KafkaProducer { Producer<String, String> producer ; /*#指定kafka节点列表,用于获取metadata,没必要所有指定 metadata.broker.list=192.168.2.105:9092,192.168.2.106:9092 # 指定分区处理类。默认kafka.producer.DefaultPartitioner,表经过key哈希到对应分区 #partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner # 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。 compression.codec=none # 指定序列化处理类(mafka client API调用说明-->3.序列化约定wiki),默认为kafka.serializer.DefaultEncoder,即byte[] serializer.class=com.meituan.mafka.client.codec.MafkaMessageEncoder # serializer.class=kafka.serializer.DefaultEncoder # serializer.class=kafka.serializer.StringEncoder # 若是要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。 #compressed.topics= ########### request ack ############### # producer接收消息ack的时机.默认为0. # 0: producer不会等待broker发送ack # 1: 当leader接收到消息以后发送ack # 2: 当全部的follower都同步消息成功后发送ack. request.required.acks=0 # 在向producer发送ack以前,broker容许等待的最大时间 # 若是超时,broker将会向producer发送一个error ACK.意味着上一次消息由于某种 # 缘由未能成功(好比follower未能同步成功) request.timeout.ms=10000 ########## end ##################### # 同步仍是异步发送消息,默认“sync”表同步,"async"表异步。异步能够提升发送吞吐量, # 也意味着消息将会在本地buffer中,并适时批量发送,可是也可能致使丢失未发送过去的消息 producer.type=sync ############## 异步发送 (如下四个异步参数可选) #################### # 在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms # 此值和batch.num.messages协同工做. queue.buffering.max.ms = 5000 # 在async模式下,producer端容许buffer的最大消息量 # 不管如何,producer都没法尽快的将消息发送给broker,从而致使消息在producer端大量沉积 # 此时,若是消息的条数达到阀值,将会致使producer端阻塞或者消息被抛弃,默认为10000 queue.buffering.max.messages=20000 # 若是是异步,指定每次批量发送数据量,默认为200 batch.num.messages=500 # 当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后 # 阻塞必定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息) # 此时producer能够继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间 # -1: 无阻塞超时限制,消息不会被抛弃 # 0:当即清空队列,消息被抛弃 queue.enqueue.timeout.ms=-1 ################ end ############### # 当producer接收到error ACK,或者没有接收到ACK时,容许消息重发的次数 # 由于broker并无完整的机制来避免消息重复,因此当网络异常时(好比ACK丢失) # 有可能致使broker接收到重复的消息,默认值为3. message.send.max.retries=3 # producer刷新topic metada的时间间隔,producer须要知道partition leader的位置,以及当前topic的状况 # 所以producer须要一个机制来获取最新的metadata,当producer遇到特定错误时,将会当即刷新 # (好比topic失效,partition丢失,leader失效等),此外也能够经过此参数来配置额外的刷新机制,默认值600000 topic.metadata.refresh.interval.ms=60000*/ public Producer<String, String> getClient() { if (producer == null) { Properties props = new Properties() ; //此处配置的是kafka的端口 props.put("metadata.broker.list", "xxx.xxx.xxx.xxx:9092"); //配置value的序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("producer.type", "async"); //配置key的序列化类 props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "0"); ProducerConfig config = new ProducerConfig(props) ; producer = new Producer<>(config) ; } return producer ; } public void shutdown(){ if (producer != null) { producer.close(); } } public static void main(String[] args) throws CloneNotSupportedException { KafkaProducer kafkaProducer = new KafkaProducer() ; for (int i=0 ; i< 10; i ++) { kafkaProducer.getClient().send(new KeyedMessage<String, String>("topic1","topic1_" + i + "_测试")); kafkaProducer.getClient().send(new KeyedMessage<String, String>("topic2","topic2_" + i + "_测试")); } kafkaProducer.shutdown(); } }
总结:缓存
一、producer每次new的时候,会自动建立线程池bash
二、producer在调用send方法时候,才会真正创建socket链接。服务器
链接过程以下:网络
1>、经过metadata.broker.list获取对应的brokers全量信息(metadata.broker.list给的broker的ip和端口只要保证一个是可用的便可,无需所有列出。不过开发过程当中,通常所有列出)。session
2>、根据zookeeper的注册信息获取topic的分区信息多线程
3>、创建client和broker的socket链接
三、send结束后,直接关闭socket链接。
四、每次send会从新创建链接
五、client会自动获取topic的分区信息,所以kafka rebalance的时候,是不受影响的
CONSUMER
consumer api官方有两种,通常称为:high-level Consumer API 和 SimpleConsumer API 。
使用第二种的弊端:
我主要尝试了一下第一种也是大多数状况下使用的API。
使用high-level Consumer api,有两种用法:单个消费者和多个消费者
单消费者:
import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; public class KafkaSingleConsumer { /** * # zookeeper链接服务器地址,此处为线下测试环境配置(kafka消息服务-->kafka broker集群线上部署环境wiki) # 配置例子:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka # zookeeper的session过时时间,默认5000ms,用于检测消费者是否挂掉,当消费者挂掉,其余消费者要等该指定时间才能检查到而且触发从新负载均衡 zookeeper.session.timeout.ms=5000 zookeeper.connection.timeout.ms=10000 # 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次得到的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息 zookeeper.sync.time.ms=2000 #指定消费组 group.id=xxx # 当consumer消费必定量的消息以后,将会自动向zookeeper提交offset信息 # 注意offset信息并非每消费一次消息就向zk提交一次,而是如今本地保存(内存),并按期提交,默认为true auto.commit.enable=true # 自动更新时间。默认60 * 1000 auto.commit.interval.ms=1000 # 当前consumer的标识,能够设定,也能够有系统生成,主要用来跟踪消息消费状况,便于观察 conusmer.id=xxx # 消费者客户端编号,用于区分不一样客户端,默认客户端程序自动产生 client.id=xxxx # 最大取多少块缓存到消费者(默认10) queued.max.message.chunks=50 # 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新 # 的consumer上,若是一个consumer得到了某个partition的消费权限,那么它将会向zk注册 # "Partition Owner registry"节点信息,可是有可能此时旧的consumer尚没有释放此节点, # 此值用于控制,注册节点的重试次数. rebalance.max.retries=5 # 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk # 每次feth将获得多条消息,此值为总大小,提高此值,将会消耗更多的consumer端内存 fetch.min.bytes=6553600 # 当消息的尺寸不足时,server阻塞的时间,若是超时,消息将当即发送给consumer fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360 # 若是zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、 # anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest auto.offset.reset=smallest # 指定序列化处理类(mafka client API调用说明-->3.序列化约定wiki),默认为kafka.serializer.DefaultDecoder,即byte[] derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder */ public static void main(String args[]) { String topic = "topic1" ; Properties props = new Properties(); props.put("zookeeper.connect", "xxx.xxx.xxx:2181"); props.put("group.id", "testgroup"); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config) ; Map<String, Integer> topicMap = new HashMap<>(); // Define single thread for topic topicMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap); List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic); for (KafkaStream<byte[], byte[]> stream : streamList) { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); while (consumerIte.hasNext()) System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message())); } if (consumer != null) consumer.shutdown(); } }
多消费者
import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class KafkaMultiConsumer { /** * # zookeeper链接服务器地址,此处为线下测试环境配置(kafka消息服务-->kafka broker集群线上部署环境wiki) # 配置例子:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka # zookeeper的session过时时间,默认5000ms,用于检测消费者是否挂掉,当消费者挂掉,其余消费者要等该指定时间才能检查到而且触发从新负载均衡 zookeeper.session.timeout.ms=5000 zookeeper.connection.timeout.ms=10000 # 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次得到的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息 zookeeper.sync.time.ms=2000 #指定消费组 group.id=xxx # 当consumer消费必定量的消息以后,将会自动向zookeeper提交offset信息 # 注意offset信息并非每消费一次消息就向zk提交一次,而是如今本地保存(内存),并按期提交,默认为true auto.commit.enable=true # 自动更新时间。默认60 * 1000 auto.commit.interval.ms=1000 # 当前consumer的标识,能够设定,也能够有系统生成,主要用来跟踪消息消费状况,便于观察 conusmer.id=xxx # 消费者客户端编号,用于区分不一样客户端,默认客户端程序自动产生 client.id=xxxx # 最大取多少块缓存到消费者(默认10) queued.max.message.chunks=50 # 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新 # 的consumer上,若是一个consumer得到了某个partition的消费权限,那么它将会向zk注册 # "Partition Owner registry"节点信息,可是有可能此时旧的consumer尚没有释放此节点, # 此值用于控制,注册节点的重试次数. rebalance.max.retries=5 # 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk # 每次feth将获得多条消息,此值为总大小,提高此值,将会消耗更多的consumer端内存 fetch.min.bytes=6553600 # 当消息的尺寸不足时,server阻塞的时间,若是超时,消息将当即发送给consumer fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360 # 若是zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、 # anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest auto.offset.reset=smallest # 指定序列化处理类(mafka client API调用说明-->3.序列化约定wiki),默认为kafka.serializer.DefaultDecoder,即byte[] derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder */ public static void main(String args[]) { String topic = "topic1" ; int threadCount = 3; Properties props = new Properties(); props.put("zookeeper.connect", "xxx.xxx.xxx.xxx:2181"); props.put("group.id", "testgroup"); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config) ; Map<String, Integer> topicMap = new HashMap<>(); // Define single thread for topic topicMap.put(topic, 3); ExecutorService executor = Executors.newFixedThreadPool(threadCount); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap); List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic); int count = 0; for (final KafkaStream<byte[], byte[]> stream : streamList) { final String threadNumber = "Thread" + count ; executor.execute(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); while (consumerIte.hasNext()) { System.out.println("Thread Number " + threadNumber + ": " + new String(consumerIte.next().message())); } } }); count++ ; } } }
总结:
一、KAFKA容许多个consumer group,每一个group容许多个consumer。不一样group之间共享信息(相似发布-订阅模式),同一个group之间的多个consumer只会消费消息一次(相似生产-消费者模式)。
二、对同一个topic启动多个java consumer线程,在zookeeper上能够看到多个信息:
[zk:xxx.xxx.xxx.xxx:2181(CONNECTED) 120] ls /consumers/testgroup/ids [testgroup_xxx-1459926903849-fea50e90, testgroup_xxx-1459926619712-8d1caf90]
三、若是多线程方式启动consumer,能够看到不一样的consumer绑定到不一样的topic patition上
[zk: xxx.xxx.xxx.xxx:2181(CONNECTED) 121] get /consumers/testgroup/owners/topic1/1 testgroup_xxx-1459926619712-8d1caf90-1 cZxid = 0x2000006e2 ctime = Wed Apr 06 03:15:04 EDT 2016 mZxid = 0x2000006e2 mtime = Wed Apr 06 03:15:04 EDT 2016 pZxid = 0x2000006e2 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x153413bc26e007e dataLength = 44 numChildren = 0 [zk: xxx.xxx.xxx.xxx:2181(CONNECTED) 122] get /consumers/testgroup/owners/topic1/0 testgroup_xxx-1459926619712-8d1caf90-0 cZxid = 0x2000006e3 ctime = Wed Apr 06 03:15:04 EDT 2016 mZxid = 0x2000006e3 mtime = Wed Apr 06 03:15:04 EDT 2016 pZxid = 0x2000006e3 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x153413bc26e007e dataLength = 44 numChildren = 0
四、对于启动多个consumer进程或是以多线程方式启动单个consumer进程,区别仅仅在与zookeeper上注册的consumer信息是多个或是一个“ls /consumers/testgroup/ids ”,可是对于消息的消费而言,都遵照消费只消费一次,同一个分区只会绑定一个consumer信息。
五、若是某个消费者挂掉的话,consumer和partition的绑定信息会从新分配,尽量的保证负载平衡
六、若是consumer的数量大于分区数量,会形成多余的那部分线程没法获取消息,不断 Got ping response for sessionid: 0x153413bc26e0082 after 2ms。是一种资源的浪费
若是多台服务器都启动consumer进程,最好根据分区数合理分配consumer进程中,消费线程的数量
更底层的细节问题,后期遇到再继续调研,先会用,明白大体原理!