相关文章:html
大数据系列之Kafka安装 apache
大数据系列之Flume--几种不一样的Sourcesbootstrap
大数据系列之Flume+HDFSpost
关于Flume 的 一些核心概念:大数据
组件名称 | 功能介绍 |
Agent代理 | 使用JVM 运行Flume。每台机器运行一个agent,可是能够在一个agent中包含多个sources和sinks。 |
Client客户端 | 生产数据,运行在一个独立的线程。 |
Source源 | 从Client收集数据,传递给Channel。 |
Sink接收器 | 从Channel收集数据,进行相关操做,运行在一个独立线程。 |
Channel通道 | 链接 sources 和 sinks ,这个有点像一个队列。 |
Events事件 | 传输的基本数据负载。 |
1. kafka.properties:url
agent.sources = s1 agent.channels = c1 agent.sinks = k1 agent.sources.s1.type=exec agent.sources.s1.command=tail -F /tmp/logs/kafka.log agent.sources.s1.channels=c1 agent.channels.c1.type=memory agent.channels.c1.capacity=10000 agent.channels.c1.transactionCapacity=100 #设置Kafka接收器 agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #设置Kafka的broker地址和端口号 agent.sinks.k1.brokerList=master:9092 #设置Kafka的Topic agent.sinks.k1.topic=kafkatest #设置序列化方式 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder agent.sinks.k1.channel=c1
关于配置文件中注意3点:线程
a. agent.sources.s1.command=tail -F /tmp/logs/kafka.log 3d
b. agent.sinks.k1.brokerList=master:9092代理
c . agent.sinks.k1.topic=kafkatest code
2.很明显,由配置文件能够了解到:
a.咱们须要在/tmp/logs下建一个kafka.log的文件,且向文件中输出内容(下面会说到);
b.flume链接到kafka的地址是 master:9092,注意不要配置出错了;
c.flume会将采集后的内容输出到Kafka topic 为kafkatest上,因此咱们启动zk,kafka后须要打开一个终端消费topic kafkatest的内容。这样就能够看到flume与kafka之间玩起来了~~
3.具体操做:
a.在/tmp/logs下创建空文件kafka.log。在mfz 用户目录下新建脚本kafkaoutput.sh(必定要给予可执行权限),用来向kafka.log输入内容: kafka_test***
for((i=0;i<=1000;i++)); do echo "kafka_test-"+$i>>/tmp/logs/kafka.log; done
b. 在kafka安装目录下执行以下命令,启动zk,kafka 。(不明白此处可参照 大数据系列之Flume+HDFS)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
bin/kafka-server-start.sh -daemon config/server.properties &
c.新增Topic kafkatest
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatest
d.打开新终端,在kafka安装目录下执行以下命令,生成对topic kafkatest 的消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkatest --from-beginning --zookeeper master
e.启动flume
bin/flume-ng agent --conf-file conf/kafka.properties -c conf/ --name agent -Dflume.root.logger=DEBUG,console
d.执行kafkaoutput.sh脚本(注意观察kafka.log内容及消费终端接收到的内容)
e.查看新终端消费信息
总体流程如图:
完~~
后续将介绍Java代码对于Flume+HDFS ,Flume+Kafka的实现。敬请期待~