1、java
一、原理图shell
二、原理描述markdown
一个topic 能够配置几个partition,produce发送的消息分发到不一样的partition中,consumer接受数据的时候是按照group来接受,kafka确保每一个partition只能同一个group中的同一个consumer消费,若是想要重复消费,那么须要其余的组来消费。Zookeerper中保存这每一个topic下的每一个partition在每一个group中消费的offset
新版kafka把这个offsert保存到了一个__consumer_offsert的topic下
这个__consumer_offsert 有50个分区,经过将group的id哈希值%50的值来肯定要保存到那一个分区. 这样也是为了考虑到zookeeper不擅长大量读写的缘由。
因此,若是要一个group用几个consumer来同时读取的话,须要多线程来读取,一个线程至关于一个consumer实例。当consumer的数量大于分区的数量的时候,有的consumer线程会读取不到数据。
假设一个topic test 被groupA消费了,如今启动另一个新的groupB来消费test,默认test-groupB的offset不是0,而是没有新创建,除非当test有数据的时候,groupB会收到该数据,该条数据也是第一条数据,groupB的offset也是刚初始化的ofsert, 除非用显式的用–from-beginnging 来获取从0开始数据 多线程
三、查看topic-group的offsert 负载均衡
位置:zookeeper
路径:[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets/partitions
在zookeeper的topic中有一个特殊的topic __consumer_offserts
计算方法:(放入哪一个partitions)ide
int hashCode = Math.abs("ttt".hashCode());工具
int partition = hashCode % 50;oop
先计算group的hashCode,再除以分区数(50),能够获得partition的值 post
使用命令查看: kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"性能
4.参数
auto.offset.reset:默认值为largest,表明最新的消息,smallest表明从最先的消息开始读取,当consumer刚开始建立的时候没有offset这种状况,若是设置了largest,则为当收到最新的一条消息的时候开始记录offsert,若设置为smalert,那么会从头开始读partition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import
kafka.producer.Partitioner;
import
kafka.utils.VerifiableProperties;
public
class
JasonPartitioner<T>
implements
Partitioner {
public
JasonPartitioner(VerifiableProperties verifiableProperties) {}
@Override
public
int
partition(Object key,
int
numPartitions) {
try
{
int
partitionNum = Integer.parseInt((String) key);
return
Math.abs(Integer.parseInt((String) key) % numPartitions);
}
catch
(Exception e) {
return
Math.abs(key.hashCode() % numPartitions);
}
}
}
|
若是将上例中的类做为partition.class,并经过以下代码发送20条消息(key分别为0,1,2,3)至topic3(包含4个Partition)。
1
2
3
4
5
6
7
8
9
10
|
public
void
sendMessage()
throws
InterruptedException{
for
(
int
i =
1
; i <=
5
; i++){
List messageList =
new
ArrayList<KeyedMessage<String, String>>();
for
(
int
j =
0
; j <
4
; j++){
messageList.add(
new
KeyedMessage<String, String>(
"topic2"
, j+
""
,
"The "
+ i +
" message for key "
+ j));
}
producer.send(messageList);
}
producer.close();
}
|
则key相同的消息会被发送并存储到同一个partition里,并且key的序号正好和Partition序号相同。(Partition序号从0开始,本例中的key也从0开始)。下图所示是经过Java程序调用Consumer后打印出的消息列表。
四、consumer group (本节全部描述都是基于Consumer hight level API而非low level API)。
使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。
这是Kafka用来实现一个Topic消息的广播(发给全部的Consumer)和单播(发给某一个Consumer)的手段。一个Topic能够对应多个Consumer Group。若是须要实现广播,只要每一个Consumer有一个独立的Group就能够了。要实现单播只要全部的Consumer在同一个Group里。用Consumer Group还能够将Consumer进行自由的分组而不须要屡次发送消息到不一样的Topic。
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还能够同时将数据实时备份到另外一个数据中心,只须要保证这三个操做所使用的Consumer属于不一样的Consumer Group便可。
下面这个例子更清晰地展现了Kafka Consumer Group的特性。首先建立一个Topic (名为topic1,包含3个Partition),而后建立一个属于group1的Consumer实例,并建立三个属于group2的Consumer实例,最后经过Producer向topic1发送key分别为1,2,3的消息。结果发现属于group1的Consumer收到了全部的这三条消息,同时group2中的3个Consumer分别收到了key为1,2,3的消息。