1、安装Zookeeperhtml
参考: Zookeeper的下载、安装和启动 java
Zookeeper 集群搭建--单机伪分布式集群apache
2、下载Kafkabootstrap
进入http://kafka.apache.org/downloads 服务器
我这里使用版本:kafka_2.11-1.0.1.tgzsession
3、Kafka目录dom
解压到/usr/local路径下: tar -zxvf kafka_2.11-1.0.1.tgz分布式
/bin 操做kafka的可执行脚本ide
/config 配置文件所在的目录post
/libs 依赖库目录
/logs 日志数据目录。kafka把server端的日志分为: server, request, state, log-cleaner, controller
建立log目录 cd /usr/local/kafka_2.11-1.0.1 && mkdir kafkaLogs
4、配置
一、配置zookeeper
二、kafka配置
进入config/server.properties
#broker的id,集群中的每台机器id惟一,其余两台分别1和2 broker.id=0 #是Kafka绑定的interface,这里须要写本机内网ip地址,否则bind端口失败 #其余两台分别是192.168.1.5和192.168.1.9 host.name=192.168.1.3 #向zookeeper注册的对外暴露的ip和port,118.212.149.51是192.168.1.3的外网ip地址 #若是不配置kafka部署在外网服务器的话本地是访问不到的. advertised.listeners=PLAINTEXT://118.212.149.51:9092 #zk集群的ip和port,zk集群教程: zookeeper.connect=192.168.1.3:2181,192.168.1.5:2181,192.168.1.9:2181 #log目录,刚刚上边建好的. log.dirs=/usr/local/kafka_2.11-1.0.1/kafkaLogs
3、启动Kafka
一、启动Kafka
./kafka-server-start.sh ../config/server.properties
启动Kafka,出现内存不够。默认内存为1G,若是设备内存比较小,修改配置
bin下面的kafka-server-start.sh,修改
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
为export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
这里启动一个Kafka。若是是部署kafka集群,分别启动多个Kafak。
二、查看broker的Id
登陆Zookeeper
./zkCli.sh -server 127.0.0.1:2181
这里的0是broker的id
查看broker信息
get /brokers/ids/0
4、建立主题
一、建立主题test1
/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
查看全部的主题:
./kafka-topics.sh --list --zookeeper localhost:2181
5、查看topic详细信息
./kafka-topics.sh --describe --zookeeper localhost:2181
第一行topic信息摘要: 分别是topic名字(Topic), partition数量(PartitionCount), 副本数量(ReplicationFactor), 配置(Config)
第二行分别为test1的topic全部partition。依次为topic名字(Topic), partition号(Partition), 此partition所在的borker(Leader),副本所在的broker(Replicas),
Isr列表(同步状态副本列表),通俗理解为替补队员。 不是每一个broker均可以做为替补队员。首先这个broker得存有副本,其次副本还得知足条件。
6、生产消息
./kafka-console-producer.sh --broker-list PLAINTEXT://47.xx.47.120:9092
testTopic1 是主题名称
7、消费消息
./kafka-console-consumer.sh --bootstrap-server PLAINTEXT://47.xx.xx.120:9092 --topic testTopic1 --from-beginning
8、JAVA实战
一、引入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.1</version> </dependency>
二、生成者
import java.util.Properties; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.log4j.Logger; public class Producer { static Logger log = Logger.getLogger(Producer.class); private static final String TOPIC = "test"; private static final String BROKER_LIST = "47.xx.xx.120:9092"; private static KafkaProducer<String,String> producer = null; /* 初始化生产者 */ static { Properties configs = initConfig(); producer = new KafkaProducer<String, String>(configs); } /* 初始化配置 */ private static Properties initConfig(){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); return properties; } public static void main(String[] args) throws InterruptedException { //消息实体 ProducerRecord<String , String> record = null; for (int i = 0; i < 5; i++) { record = new ProducerRecord<String, String>(TOPIC, "product value"+(int)(10*(Math.random()))); //发送消息 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (null != e){ log.info("send error" + e.getMessage()); }else { System.out.println(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition())); } } }); } producer.close(); } }
启动生产者,控制台输出
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder] 2019-08-01 13:52:22.494 INFO --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [47.xx.xx.120:9092] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 13:52:22.494 [main] INFO o.a.k.c.producer.ProducerConfig - ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [47.xx.xx.120:9092] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 2019-08-01 13:52:22.609 INFO --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1 13:52:22.609 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 1.0.1 2019-08-01 13:52:22.609 INFO --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e 13:52:22.609 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : c0518aa65f25317e 2019-08-01 13:52:23.174 INFO --- [ main] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 13:52:23.174 [main] INFO o.a.k.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. offset:15,partition:0 offset:16,partition:0 offset:17,partition:0 offset:18,partition:0 offset:19,partition:0 Process finished with exit code 0
三、消费者
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.log4j.Logger; public class Consumer { static Logger log = Logger.getLogger(Producer.class); private static final String TOPIC = "test"; private static final String BROKER_LIST = "47.xx.xx.120:9092"; private static KafkaConsumer<String,String> consumer = null; static { Properties configs = initConfig(); consumer = new KafkaConsumer<String, String>(configs); consumer.subscribe(Arrays.asList(TOPIC)); } private static Properties initConfig(){ Properties properties = new Properties(); properties.put("bootstrap.servers",BROKER_LIST); properties.put("group.id","0"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("enable.auto.commit", "true"); properties.setProperty("auto.offset.reset", "earliest"); return properties; } public static void main(String[] args) { while (true) { ConsumerRecords<String, String> records = consumer.poll(10); for (ConsumerRecord<String, String> record : records) { log.info(record); } } } }
运行后,控制台输出
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder] 2019-08-01 13:52:43.837 INFO --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [47.xx.xx.120:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = 0 heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 13:52:43.837 [main] INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [47.xx.xx.120:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = 0 heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2019-08-01 13:52:43.972 INFO --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1 13:52:43.972 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 1.0.1 2019-08-01 13:52:43.972 INFO --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e 13:52:43.972 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : c0518aa65f25317e 2019-08-01 13:52:44.350 INFO --- [ main] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=0] Discovered group coordinator 47.xx.xx.120:9092 (id: 2147483647 rack: null) 13:52:44.350 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=0] Discovered group coordinator 47.xx.xx.120:9092 (id: 2147483647 rack: null) 2019-08-01 13:52:44.356 INFO --- [ main] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=0] Revoking previously assigned partitions [] 13:52:44.356 [main] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=0] Revoking previously assigned partitions [] 2019-08-01 13:52:44.356 INFO --- [ main] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=0] (Re-)joining group 13:52:44.356 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=0] (Re-)joining group 2019-08-01 13:52:44.729 INFO --- [ main] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=0] Successfully joined group with generation 3 13:52:44.729 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=0] Successfully joined group with generation 3 2019-08-01 13:52:44.730 INFO --- [ main] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=0] Setting newly assigned partitions [test-0] 13:52:44.730 [main] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=0] Setting newly assigned partitions [test-0] 2019-08-01 13:52:45.033 INFO --- [ main] c.e.filesmanager.utils.kafka.Producer : ConsumerRecord(topic = test, partition = 0, offset = 15, CreateTime = 1564638743167, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value6) 13:52:45.033 [main] INFO c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 15, CreateTime = 1564638743167, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value6) 2019-08-01 13:52:45.033 INFO --- [ main] c.e.filesmanager.utils.kafka.Producer : ConsumerRecord(topic = test, partition = 0, offset = 16, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value7) 13:52:45.033 [main] INFO c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 16, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value7) 2019-08-01 13:52:45.033 INFO --- [ main] c.e.filesmanager.utils.kafka.Producer : ConsumerRecord(topic = test, partition = 0, offset = 17, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value4) 13:52:45.033 [main] INFO c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 17, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value4) 2019-08-01 13:52:45.033 INFO --- [ main] c.e.filesmanager.utils.kafka.Producer : ConsumerRecord(topic = test, partition = 0, offset = 18, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value4) 13:52:45.033 [main] INFO c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 18, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value4) 2019-08-01 13:52:45.033 INFO --- [ main] c.e.filesmanager.utils.kafka.Producer : ConsumerRecord(topic = test, partition = 0, offset = 19, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value3) 13:52:45.033 [main] INFO c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 19, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value3)