Kafka在windows下的配置使用

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是能够实时的处理大量数据以知足各类需求场景:好比基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。具体理论性的内容我也不是特别懂,这里有一篇文章写的很好,转发一下https://blog.csdn.net/YChenFeng/article/details/74980531html

下面就开始配置和使用java

一、下载mysql

下载地址:http://kafka.apache.org/downloads.html ,我用的是kafka_2.12-1.0.0.tgz版本。有兴趣的能够下载最新办法。nginx

二、下载完成以后,直接解压,目录结构是这样的git

bin目录是各类启动文件,config是相关参数配置文件,logs是自建的日志文件,libs是使用到的相关jar文件github

启动文件中有windows项,用例直接启动相关服务。web

三、修改配置参数spring

进入config目录,编辑 server.properties文件,找到并编辑log.dirs= D:\\Tools\\kafka_2.11-1.0.0\\logs,找到并编辑zookeeper.connect=localhost:2181。表示本地运行。sql

(Kafka会按照默认,在9092端口上运行,并链接zookeeper的默认端口:2181)express

四、启动服务,在dos服务下输入下面命令进行启动

D:\tools\kafka_2.12-2.1.0\bin\windows\zookeeper-server-start.bat D:\tools\kafka_2.12-2.1.0\config\zookeeper.properties

首先启动,zookeeper服务,对应加载zookeeper的配置文件,kafka依赖zookeeper监控其状态

D:\tools\kafka_2.12-2.1.0\bin\windows\kafka-server-start.bat D:\tools\kafka_2.12-2.1.0\config\server.properties

而后启动kafka服务,对应加载相应配置文件

 

 五、使用idea在git 上下载kafka-monitor 代码,地址:https://github.com/linxin26/kafka-monitor

运行Start,在浏览器输入http://127.0.0.1:5050

查看现有主题 topic

查看 broker

六、编写测试类查看相关情况

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Collection; import java.util.Properties; /** * Created by songxiaofei on 2018-12-27. */
public class ConsumerMessage { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "10.1.9.3:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); final KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props); consumer.subscribe(Arrays.asList("buy"),new ConsumerRebalanceListener() { public void onPartitionsRevoked(Collection<TopicPartition> collection) { } public void onPartitionsAssigned(Collection<TopicPartition> collection) { //将偏移设置到最开始
 consumer.seekToBeginning(collection); } }); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n,checksum=%s", record.offset(), record.key(), record.value(),record.checksum()); } } }

执行代码,能够查看相关日志,用来查看kafka的配置状况

D:\tools\Java\jdk1.8.0_31\bin\java "-javaagent:D:\tools\JetBrains\IntelliJ IDEA 2018.1\lib\idea_rt.jar=64422:D:\tools\JetBrains\IntelliJ IDEA 2018.1\bin" -Dfile.encoding=UTF-8 -classpath D:\tools\Java\jdk1.8.0_31\jre\lib\charsets.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\deploy.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\access-bridge-64.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\cldrdata.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\dnsns.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\jaccess.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\jfxrt.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\localedata.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\nashorn.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\sunec.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\sunjce_provider.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\sunmscapi.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\sunpkcs11.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\zipfs.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\javaws.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\jce.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\jfr.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\jfxswt.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\jsse.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\management-agent.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\plugin.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\resources.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\rt.jar;E:\jetproject\kafka-monitor-master\target\classes;E:\mavrepository\.m2\repository\org\apache\kafka\kafka-clients\0.10.0.1\kafka-clients-0.10.0.1.jar;E:\mavrepository\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;E:\mavrepository\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar;E:\mavrepository\.m2\repository\org\apache\commons\commons-lang3\3.5\commons-lang3-3.5.jar;E:\mavrepository\.m2\repository\org\apache\kafka\kafka_2.10\0.10.0.1\kafka_2.10-0.10.0.1.jar;E:\mavrepository\.m2\repository\com\101tec\zkclient\0.8\zkclient-0.8.jar;E:\mavrepository\.m2\repository\org\scala-lang\scala-library\2.10.6\scala-library-2.10.6.jar;E:\mavrepository\.m2\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;E:\mavrepository\.m2\repository\net\sf\jopt-simple\jopt-simple\4.9\jopt-simple-4.9.jar;E:\mavrepository\.m2\repository\com\alibaba\fastjson\1.2.22\fastjson-1.2.22.jar;E:\mavrepository\.m2\repository\org\slf4j\slf4j-api\1.6.2\slf4j-api-1.6.2.jar;E:\mavrepository\.m2\repository\org\slf4j\slf4j-log4j12\1.6.2\slf4j-log4j12-1.6.2.jar;E:\mavrepository\.m2\repository\commons-logging\commons-logging-api\1.1\commons-logging-api-1.1.jar;E:\mavrepository\.m2\repository\log4j\log4j\1.2.16\log4j-1.2.16.jar;E:\mavrepository\.m2\repository\mysql\mysql-connector-java\5.1.40\mysql-connector-java-5.1.40.jar;E:\mavrepository\.m2\repository\org\apache\curator\curator-framework\2.10.0\curator-framework-2.10.0.jar;E:\mavrepository\.m2\repository\org\apache\curator\curator-client\2.10.0\curator-client-2.10.0.jar;E:\mavrepository\.m2\repository\com\google\guava\guava\16.0.1\guava-16.0.1.jar;E:\mavrepository\.m2\repository\org\apache\zookeeper\zookeeper\3.4.8\zookeeper-3.4.8.jar;E:\mavrepository\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;E:\mavrepository\.m2\repository\io\netty\netty\3.7.0.Final\netty-3.7.0.Final.jar;E:\mavrepository\.m2\repository\org\apache\curator\curator-recipes\2.10.0\curator-recipes-2.10.0.jar;E:\mavrepository\.m2\repository\org\jolokia\jolokia-jvm\1.3.5\jolokia-jvm-1.3.5.jar;E:\mavrepository\.m2\repository\org\jolokia\jolokia-core\1.5.0\jolokia-core-1.5.0.jar;E:\mavrepository\.m2\repository\com\googlecode\json-simple\json-simple\1.1.1\json-simple-1.1.1.jar;D:\tools\Java\jdk1.8.0_31\lib\tools.jar;E:\mavrepository\.m2\repository\org\projectlombok\lombok\1.16.18\lombok-1.16.18.jar;E:\mavrepository\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.0.0.RELEASE\spring-boot-starter-web-2.0.0.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\boot\spring-boot-starter\2.0.0.RELEASE\spring-boot-starter-2.0.0.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\boot\spring-boot\2.0.0.RELEASE\spring-boot-2.0.0.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.0.0.RELEASE\spring-boot-autoconfigure-2.0.0.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.0.0.RELEASE\spring-boot-starter-logging-2.0.0.RELEASE.jar;E:\mavrepository\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\mavrepository\.m2\repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\mavrepository\.m2\repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\mavrepository\.m2\repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\mavrepository\.m2\repository\org\springframework\spring-core\5.0.4.RELEASE\spring-core-5.0.4.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\spring-jcl\5.0.4.RELEASE\spring-jcl-5.0.4.RELEASE.jar;E:\mavrepository\.m2\repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\mavrepository\.m2\repository\org\springframework\boot\spring-boot-starter-json\2.0.0.RELEASE\spring-boot-starter-json-2.0.0.RELEASE.jar;E:\mavrepository\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.9.4\jackson-databind-2.9.4.jar;E:\mavrepository\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\mavrepository\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.9.4\jackson-core-2.9.4.jar;E:\mavrepository\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.4\jackson-datatype-jdk8-2.9.4.jar;E:\mavrepository\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.4\jackson-datatype-jsr310-2.9.4.jar;E:\mavrepository\.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.4\jackson-module-parameter-names-2.9.4.jar;E:\mavrepository\.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.0.RELEASE\spring-boot-starter-tomcat-2.0.0.RELEASE.jar;E:\mavrepository\.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.28\tomcat-embed-core-8.5.28.jar;E:\mavrepository\.m2\repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.28\tomcat-embed-el-8.5.28.jar;E:\mavrepository\.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.28\tomcat-embed-websocket-8.5.28.jar;E:\mavrepository\.m2\repository\org\hibernate\validator\hibernate-validator\6.0.7.Final\hibernate-validator-6.0.7.Final.jar;E:\mavrepository\.m2\repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\mavrepository\.m2\repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\mavrepository\.m2\repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\mavrepository\.m2\repository\org\springframework\spring-web\5.0.4.RELEASE\spring-web-5.0.4.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\spring-beans\5.0.4.RELEASE\spring-beans-5.0.4.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\spring-webmvc\5.0.4.RELEASE\spring-webmvc-5.0.4.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\spring-aop\5.0.4.RELEASE\spring-aop-5.0.4.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\spring-context\5.0.4.RELEASE\spring-context-5.0.4.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\spring-expression\5.0.4.RELEASE\spring-expression-5.0.4.RELEASE.jar co.solinx.kafka.monitor.ConsumerMessage log4j:WARN No such property [maxFileSize] in org.apache.log4j.DailyRollingFileAppender. log4j:WARN No such property [maxBackupIndex] in org.apache.log4j.DailyRollingFileAppender. 2019-01-16 15:56:36,600 [main] [org.apache.kafka.clients.consumer.ConsumerConfig]-[INFO] ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [10.1.9.3:9092] ssl.keystore.type = JKS enable.auto.commit = true sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 1000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = test retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest

 七、使用命令行建立topic

D:\tools\kafka_2.12-2.1.0\bin\windows\kafka-topics.bat --create --zookeeper localhost:12181 --replication-factor 1 --partitions 1 --topic hello

查看已有的topic

D:\tools\kafka_2.12-2.1.0\bin\windows\kafka-topics.bat --list --zookeeper 127.0.0.1:12181

 

建立一个消息生产者,同时建立一个消息消费者,去接收消息生产者发来的消息

D:\tools\kafka_2.12-2.1.0\bin\windows\kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic hello  建立一个消息生产者

D:\tools\kafka_2.12-2.1.0\bin\windows\kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic hello --from-beginning 建立一个消息消费者