【Kafka】MirrorMaker 跨集群同步方案

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)java

1、正则表达式

Kafka's mirroring feature makes it possible to maintain a replica of an existing kafka cluster. The following diagram shows how to use the MirrorMaker tool to mirror a source kafka cluster into a target(mirror) kafka cluster.The tool uses a kafka comsumer to consume messages from the source cluster,and re-publishes those messages to the local(target)cluster using an embedded kafka producer.shell


2、apache

1.whitelist and blacklist 黑白名单异步

mirror-maker接受精确指定同步的topic的黑白名单。使用java的标准的正则表达式,为了方便,逗号(,)被编译成java正则的|async

2.producer timeoutcode

为了支持高吞吐量,你最好会用异步的内置producer,并将内置的producer设置为阻塞模式(queue.enqueueTimeout.ms=-1) 这样能够保证数据不会丢失。不然,异步producer的默认queue.enqueueTimeout.ms=0,若是producer内部的队列满了,数据会被丢弃,并抛出QueueFullException异常。而对于阻塞模式的producer,若是内部队列满了就会一直等到,从而有效的节制了内置的comsumer的消费速冻。你能够打开producer的trace logging,随时看到内部队列剩余量。server

3.Producer的重试次数retries队列


3、ip

Source Kafka Cluster

A:==> 配置zk
[root@hftest0001 conf]# pwd
/opt/zookeeper-3.4.6/conf

[root@hftest0001 conf]# ll
total 16
-rw------- 1 root root  535 Jan 28 13:27 configuration.xsl
-rw------- 1 root root 2161 Jan 28 13:27 log4j.properties
-rw------- 1 root root 1043 Jan 28 13:31 zoo.cfg
-rw------- 1 root root  922 Jan 28 13:27 zoo_sample.cfg

[root@hftest0001 conf]# cat zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper-3.4.6/data/
clientPort=2181

[root@hftest0001 config]# pwd
/opt/kafka_2.11-0.8.2.2/config

B:==> 配置kafka
[root@hftest0001 config]# cat server.properties
broker.id=201
port=9092
...
...
zookeeper.connect=${source_zk_ip}:2181
...

C:==> 启动source kafka
[root@hftest0001 kafka_2.11-0.8.2.2]# ./bin/kafka-server-start.sh config/server.properties &
...
...

D:==> 建立topic
[root@hftest0001 kafka_2.11-0.8.2.2]# ./bin/kafka-topics.sh --zookeeper 10.224.246.201:2181 --replication-factor 1  --partitions 1 --topic r1-p1-1 --create

E:==> 查看topic
[root@hftest0001 kafka_2.11-0.8.2.2]# ./bin/kafka-topics.sh --zookeeper ${source_zk_ip}:2181 --list
r1-p1-1
Target Kafka Cluster
一样的操做,step A,B,C,D,E

F:==>配置MirrorMaker
[root@hftest0004 config]# pwd
/opt/kafka_2.11-0.8.2.2/config

[root@hftest0004 config]# cat consumer.properties
zookeeper.connect=${source_zk_ip}:2181
zookeeper.connection.timeout.ms=6000
group.id=hftest-mirror-maker


[root@hftest0004 config]# cat producer.properties
metadata.broker.list=${target_broker_ip}:9092
producer.type=async
compression.codec=none
serializer.class=kafka.serializer.DefaultEncoder
############################# Async Producer #############################
queue.enqueue.timeout.ms=-1

[root@hftest0004 kafka_2.11-0.8.2.2]# ./bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties ----num.producers 2 --whitelist r1-p1-1
校验:

启动一个producer向source kafka cluster push数据
启动一个consumer从source kafka cluster pull数据
启动一个consumer从target kafka cluster pull数据 ==> 看是否能pull到数据
相关文章
相关标签/搜索