tar xvf jdk1.8.0_231.tar.gz -C /usr/local && cd /usr/local ln -sv jdk1.8.0_231 jdk vim /etc/profile.d/java.sh JAVA_HOME=/usr/local/jdk PATH=$JAVA_HOME/bin:$PATH
vim /usr/local/kafka/zookeeper/conf/zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/data/zookeeper clientPort=2181 maxClientCnxns=0 # 集群版的zookeeper添加以下配置 # server.1=ip1:2888:3888 # server.2=ip2:2888:3888 # server.3=ip3:28888:3888
wget https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz tar xvf kafka_2.11-0.10.2.1.tgz -C /usr/local && cd /usr/local ln -sv kafka_2.11-0.10.2.1.tgz kafka
vim /usr/local/kafka/bin/kafka-server-start.sh export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"
/usr/local/kafka/bin/zookeeper-server-start.sh -deamon /usr/local/kafka/conf/zookeeper.properties /usr/local/kafka/bin/kafka-server-start.sh -deamon /usr/local/kafka/conf/server.properties /usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/conf/server.properties /usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/conf/zookeeper.properties
/usr/local/zookeeper/bin/zkServer.sh stop|stop
须要一个解析到内网ip地址的域名,内网环境也能够设置/etc/hostsjava
host.name=kafka.test.com(对应的域名解析须要解到内网ip)
高版本已弃用。低版本0.10.2.1能够用, 仅当listeners属性未配置时被使用,已用listeners属性代替。表示broker的hostnameapache
advertised.listeners=PLAINTEXT://kafka.test.com:9092(高版本用,替代host.name,设置了advertised.listeners不用设置host.name)
注册到zookeeper上并提供给客户端的监听器,若是没有配置则使用listeners。bootstrap
advertised.host.name(不须要设置,仅做参考)
已弃用。仅当advertised.listeners或者listeners属性未配置时被使用。官网建议使用advertised.listenersvim
listeners(不须要设置,仅做参考)
须要监听的URL和协议,如:PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093。若是未指定该配置,则使用java.net.InetAddress.getCanonicalHostName()函数的的返回值ruby
[内网ip] kafka.test.com服务器
[外网ip] kafka.test.comide
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list IP:9092 --topic TOPIC
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server IP:9092 --topic TOPIC--from-beginning --max-messages 1 /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 外网IP:9092 --topic TOPIC --from-beginning --max-messages 1
output { stdout { codec => rubydebug { metadata => true } } }
a、topics_pattern 通配问题".* ","."必定不能少函数
topics_pattern=>"prefix-.*"
b、filter中匹配规则,注意要能匹配到kafka中topic,不一样的filebeat和不一样的logstash版本对应的topic元数据可能不太同样,这点须要注意.net
if [type] =~ "prefix-*" { grok { match =>["[type]","^prefix-(?<index_name>)"] } } if [kafka][topic] =~ "prefix-*" { grok { match => [ "[kafka][topic]", "^prefix-(?<index_name>.*$)" ]} } if [@metadata][topic] =~ "prefix-*" { grok { match =>["[@metadata][topic]","^prefix-(?<index_name>)"] } } if [@metadata][kafka][topic] =~ "prefix-*" { grok { match => [ "[@metadata][kafka][topic]", "^prefix-(?<index_name>.*$)" ]} }
外网kakfa消费参考:https://www.maiyewang.com/archives/17993debug