FLume官方文档
Flume1.6.0 User Guide
Kafka官网文档
Kafka Documentationhtml
Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.
Flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到目的地apache
Flume就是将数据从数据源(source)收集过来,Flume会先缓存数据(channel),再将收集到的数据送到指定的目的地(sink),最后Flume在删除本身缓存的数据数组
这样就是一个Event ,被定义为具备字节数组和可选字符串属性的数据流单元。包括 event headers、event body、event信息缓存
Flume Agent 是一个(JVM)进程,用于承载Event从外部源流向下一个目标服务器
Agent 由三个核心组成架构
Kafka® is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是能够实时的处理大量数据以知足各类需求场景分布式
一般来说,消息模型能够分为两种:队列和发布-订阅式。队列的处理方式是一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。在发布-订阅模型中,消息被广播给全部的消费者,接收到消息的消费者均可以处理此消息。Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组(consumer group)。消费者用一个消费者组名标记本身。ide
一个发布在Topic上消息被分发给此消费者组中的一个消费者。假如全部的消费者都在一个组中,那么这就变成了queue模型。假如全部的消费者都在不一样的组中,那么就彻底变成了发布-订阅模型。更通用的, 咱们能够建立一些消费者组做为逻辑上的订阅者。每一个组包含数目不等的消费者,一个组内多个消费者能够用来扩展性能和容错。 性能
而且,kafka可以保证生产者发送到一个特定的Topic的分区上,消息将会按照它们发送的顺序依次加入,也就是说,若是一个消息M1和M2使用相同的producer发送,M1先发送,那么M1将比M2的offset低,而且优先的出如今日志中。消费者收到的消息也是此顺序。若是一个Topic配置了复制因子(replication facto)为N,那么能够容许N-1服务器宕机而不丢失任何已经提交(committed)的消息。此特性说明kafka有比传统的消息系统更强的顺序保证。可是,相同的消费者组中不能有比分区更多的消费者,不然多出的消费者一直处于空等待,不会收到消息。学习
监控一个文件实时采集新增的数据输出到Kafka
FLume采用 exec source + memory channel+ kafka sink
Flume agent配置存储在本地配置文件中。配置文件包含代理中每一个source,sink和channel的属性以及它们如何链接在一块儿以造成数据流
下文$FLUME_HOME,$KAFKA_HOME指FLUME,KAFKA安装目录
进入$FLUME_HOME/conf 建立exec-memory-kafca.conf并配置
type:
The component type name, needs to be exec
command:
The command to execute
type:
Must be set to org.apache.flume.sink.kafka.KafkaSink
brokerList:
List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
topic:
default-flume-topic The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here.
batchSize:
How many messages to process in one batch. Larger batches improve throughput while adding latency.
requiredAcks:
How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.
详细配置见官方文档
# 给agent命名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /Users/null/data/flume-test.log a1.sources.r1.channels = c1 # 配置sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = kafka-test a1.sinks.k1.brokerList = localhost:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 2 a1.sinks.k1.channel = c1 # 配置memory channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
启动zookeeper
Kafka须要zookeeper环境
bin/zkServer.sh start
启动Kafka
bin/kafka-server-start.sh config/server.properties
建立话题
kafka-test 为topic 名称与flume 配置中a1.sinks.k1.topic 对应
kafka-topics.sh --list --zookeeper localhost:2181 kafka-test
启动一个Consumer
kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-test --from-beginning
启动FLume
--conf 指定配置文件所在位置 --conf-file为指定配置文件 --name 为agent的名称
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-kafca.conf --name a1 -Dflume.root.logger=INFO,console
参考:
1.Kafka基本知识整理
2.FLume架构以及应用