【翻译】Flume 1.8.0 User Guide(用户指南) source

翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guidehtml

篇幅限制,分为如下5篇:java

【翻译】Flume 1.8.0 User Guide(用户指南)node

【翻译】Flume 1.8.0 User Guide(用户指南) sourceshell

【翻译】Flume 1.8.0 User Guide(用户指南) Sinkexpress

【翻译】Flume 1.8.0 User Guide(用户指南) Channelapache

【翻译】Flume 1.8.0 User Guide(用户指南) Processorsjson

flume 的sources

1 Avro source

监听Avro端口并接收来自外部Avro客户端流的事件。当与另外一个(上一跳)Flume agent上的内置Avro接收器配对时,它能够建立分层的集合拓扑。必须属性以粗体显示。bootstrap

 

Property Name Default Description
channels  
type The component type name, needs to be avro
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
selector.type    
selector.*    
interceptors Space-separated list of interceptors
interceptors.*    
compression-type none This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
ssl false Set this to true to enable SSL encryption. You must also specify a “keystore” and a “keystore-password”.
keystore This is the path to a Java keystore file. Required for SSL.
keystore-password The password for the Java keystore. Required for SSL.
keystore-type JKS The type of the Java keystore. This can be “JKS” or “PKCS12”.
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
ipFilter false Set this to true to enable ipFiltering for netty
ipFilterRules Define N netty ipFilter pattern rules with this config.

agent a1 示例:数组

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141

ipFilterRules的例子安全

ipFilterRules定义了N个由逗号分隔的netty ipFilters,模式规则必须采用这种格式。

< '容许'或'拒绝>:< ' ip '或'名称'为计算机名>:<模式>或容许/拒绝:ip/名称:模式

例如:ipFilterRules =容许:ip: 127。*,容许:名称:localhost,否定:ip: *

注意,要匹配的第一个规则将应用于本地主机上的客户机,以下面的示例所示

这将容许本地主机上的客户端拒绝来自其余ip的客户端" Allow:name:localhost,deny:ip: ",这将拒绝本地主机上的客户端容许来自任何其余ip的客户端" deny:name:localhost, Allow:ip: "

2 Thrift Source

监听Thrift端口并接收来自外部Thrift客户机流的事件。当与另外一个(上一跳)Flume agent上的内置ThriftSink一块儿使用时,它能够建立分层的集合拓扑。经过启用kerberos身份验证,能够将Thrift source配置为以安全模式启动。agent-principal和agent-keytab是Thrift source用于对kerberos KDC进行身份验证的属性。必须属性以粗体显示。

Property Name Default Description
channels  
type The component type name, needs to be thrift
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
selector.type    
selector.*    
interceptors Space separated list of interceptors
interceptors.*    
ssl false Set this to true to enable SSL encryption. You must also specify a “keystore” and a “keystore-password”.
keystore This is the path to a Java keystore file. Required for SSL.
keystore-password The password for the Java keystore. Required for SSL.
keystore-type JKS The type of the Java keystore. This can be “JKS” or “PKCS12”.
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
kerberos false

Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab are required for successful authentication.

The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC.

agent-principal The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC.
agent-keytab —- The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC.

agent a1 的示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = thrift a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141

3 Exec Source

Exec source在启动时运行给定的Unix命令,并指望该进程在标准输出上持续生成数据(stderr将被丢弃,除非属性logStdErr设置为true)。若是进程因任何缘由退出,源也将退出,而且不会生成更多数据。这意味着cat [named pipe]或tail - f [file]等配置将产生所需的结果,而as date可能不会——前两个命令生成数据流,然后者生成单个事件并退出。必须属性以粗体显示

Property Name Default Description
channels  
type The component type name, needs to be exec
command The command to execute
shell A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.
restartThrottle 10000 Amount of time (in millis) to wait before attempting a restart
restart false Whether the executed cmd should be restarted if it dies
logStdErr false Whether the command’s stderr should be logged
batchSize 20 The max number of lines to read and send to the channel at a time
batchTimeout 3000 Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    
警告ExecSource和其余异步源的问题是,若是没法将事件放入客户端知道的Channel中,源不能保证这一点。在这种状况下,数据将丢失。
例如,最多见的请求特性之一是相似于tail -F [file]的用例,其中应用程序将写入磁盘上的日志文件,并跟踪文件,将每一行做为事件
发送。虽然这是可能的,但有一个明显的问题;若是Channel已满,Flume没法发送事件,会发生什么状况?Flume没法向编写日志文件的应
用程序指示它须要保留日志或出于某种缘由没有发送事件。若是这没有意义,您只须要知道:在使用单向异步接口(如ExecSource)时,应用
程序永远不能保证接收到数据!做为此警告的扩展—而且要彻底清楚—在使用此源时绝对没有事件交付的保证。要得到更强的可靠性保证,能够考
虑使用假脱机目录源、Taildir源或经过SDK直接与Flume集成。

agent a1的示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /var/log/secure a1.sources.r1.channels = c1

“shell”配置用于经过命令shell(如Bash或Powershell)调用“command”。“command”做为参数传递给“shell”以供执行。这容许“command”使用shell的特性,如通配符、反勾号、管道、循环、条件等。在没有“shell”配置的状况下,将直接调用“命令”。“shell”的经常使用值:“/bin/sh -c”、“/bin/ksh -c”、“cmd /c”、“powershell -Command”等。

a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

4 JMS Source

JMS源从JMS目的地(如队列或主题)读取消息。做为JMS应用程序,它应该能够与任何JMS提供程序一块儿工做,但只经过ActiveMQ进行了测试。JMS源提供可配置的批大小、消息选择器、用户/密码和消息到flume事件转换器。请注意,供应商提供的JMS jar应该包含在Flume类路径中,plugins.d目录(首选),命令行上的-classpath,或者经过flume-env.sh中的FLUME_CLASSPATH变量。必须属性以粗体显示。

Property Name Default Description
channels  
type The component type name, needs to be jms
initialContextFactory Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory The JNDI name the connection factory should appear as
providerURL The JMS provider URL
destinationName Destination name
destinationType Destination type (queue or topic)
messageSelector Message selector to use when creating the consumer
userName Username for the destination/provider
passwordFile File containing the password for the destination/provider
batchSize 100 Number of messages to consume in one batch
converter.type DEFAULT Class to use to convert messages to flume events. See below.
converter.* Converter properties.
converter.charset UTF-8 Default converter only. Charset to use when converting JMS TextMessages to byte arrays.
createDurableSubscription false

Whether to create durable subscription. Durable subscription can only be used with destinationType topic.

If true, “clientId” and “durableSubscriptionName” have to be specified.

clientId JMS client identifier set on Connection right after it is created. Required for durable subscriptions.
durableSubscriptionName Name used to identify the durable subscription. Required for durable subscriptions.

4.1 转换器

JMS源容许可插入转换器,尽管默认转换器可能在大多数状况下均可以工做。默认转换器可以将字节、文本和对象消息转换为FlumeEvents。在全部状况下,消息中的属性都做为header添加到FlumeEvent中。
BytesMessage:
消息字节被复制到FlumeEvent的主体中。每条消息不能转换超过2GB的数据。
TextMessage:
将消息文本转换为字节数组并复制到FlumeEvent的主体中。默认转换器默认使用UTF-8,但这是可配置的。
ObjectMessage:
对象被写入到包装在ObjectOutputStream中的ByteArrayOutputStream中,并将结果数组复制到FlumeEvent的主体中。
agent a1的示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = jms a1.sources.r1.channels = c1 a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory a1.sources.r1.connectionFactory = GenericConnectionFactory a1.sources.r1.providerURL = tcp://mqserver:61616 a1.sources.r1.destinationName = BUSINESS_DATA a1.sources.r1.destinationType = QUEUE

 

5 sqooping 目录souce

这个源容许您经过将文件放入磁盘上的“假脱机”目录来获取数据。这个源将在指定的目录中查找新文件,并在新文件出现时解析新文件中的事件。事件解析逻辑是可插入的。将给定的文件彻底读入通道后,将对其进行重命名,以指示完成(或可选地删除)。
与Exec源不一样,此源是可靠的,即便从新启动或关闭Flume,也不会丢失数据。做为这种可靠性的交换,必须将不可变的、唯一命名的文件放入假脱机目录中。Flume试图检测这些问题状况,若是违反这些状况将会失败:
若是文件被放入假脱机目录后写入,Flume将向其日志文件打印错误并中止处理。
若是之后重用文件名,Flume将向其日志文件打印错误并中止处理。
为了不上述问题,在将文件名移动到假脱机目录时,添加唯一标识符(如时间戳)来记录文件名多是有用的。
尽管此源具备可靠性保证,但在某些状况下,若是发生某些下游故障,仍然可能重复发生事件。这与其余flume 组件提供的保证是一致的。

Property Name Default Description
channels  
type The component type name, needs to be spooldir.
spoolDir The directory from which to read files from.
fileSuffix .COMPLETED Suffix to append to completely ingested files
deletePolicy never When to delete completed files: never or immediate
fileHeader false Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.
basenameHeader false Whether to add a header storing the basename of the file.
basenameHeaderKey basename Header Key to use when appending basename of file to event header.
includePattern ^.*$

Regular expression specifying which files to include. It can used together with ignorePattern. If a file matches both ignorePattern 

and includePattern regex, the file is ignored.

ignorePattern ^$

Regular expression specifying which files to ignore (skip). It can used together with includePattern. If a file matches both ignorePattern

 and includePattern regex, the file is ignored.

trackerDir .flumespool Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
consumeOrder oldest

In which order files in the spooling directory will be consumed oldestyoungest and random. In case of oldest and youngest,

the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest lexicographical order will be consumed first.

In case of random any file will be picked randomly. When using oldest and youngest the whole directory will be scanned to

pick the oldest/youngest file,

which might be slow if there are a large number of files, while using random may cause old files to be consumed very late

if new files keep coming in the spooling directory.

pollDelay 500 Delay (in milliseconds) used when polling for new files.
recursiveDirectorySearch false Whether to monitor sub directories for new files to read.
maxBackoff 4000

The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full.

The source will start at a low backoff

and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter.

batchSize 100 Granularity at which to batch transfer to the channel
inputCharset UTF-8 Character set used by deserializers that treat the input file as text.
decodeErrorPolicy FAIL

What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. 

REPLACE: Replace the unparseable

character with the “replacement character” char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence.

deserializer LINE

Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implement

 EventDeserializer.Builder.

deserializer.*   Varies per event deserializer.
bufferMaxLines (Obselete) This option is now ignored.
bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

 agent agent-1示例:

a1.channels = ch-1
a1.sources = src-1 a1.sources.src-1.type = spooldir a1.sources.src-1.channels = ch-1 a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool a1.sources.src-1.fileHeader = true

5.1 事件反序列化器

下面的事件反序列化器随Flume一块儿发布。

5.1.1 行

这个反序列化器为每行文本输入生成一个事件。

Property Name Default Description
deserializer.maxLineLength 2048

Maximum number of characters to include in a single event. If a line exceeds this length,

it is truncated, and the remaining characters on the line will appear in a subsequent event.

deserializer.outputCharset UTF-8 Charset to use for encoding events put into the channel.

5.1.2 Avor

这个反序列化器可以读取Avro容器文件,并在文件中为每一个Avro记录生成一个事件。每一个事件都用一个表示所使用模式的头进行注释。事件的主体是二进制Avro记录数据,不包括模式或容器文件元素的其他部分。

注意,若是spool目录源必须重试将这些事件中的一个放到channel上(例如,由于channel已满),那么它将从最近的Avro容器文件同步点重置并重试。要减小这种失败场景中的潜在事件重复,请在Avro输入文件中更频繁地编写同步标记。

Property Name Default Description
deserializer.schemaType HASH

How the schema is represented. By default, or when the value HASH is specified, the Avro schema is hashed and the hash is stored

in every event in the event header “flume.avro.schema.hash”. If LITERAL is specified, the JSON-encoded schema itself is stored

in every event in the event header “flume.avro.schema.literal”. Using LITERAL mode is relatively inefficient compared to HASH mode.

5.1.3 BlobDeserializer

这个反序列化器为每一个事件读取一个二进制大对象(BLOB),一般为每一个文件读取一个BLOB。例如PDF或JPG文件。注意,这种方法不适用于很是大的对象,由于整个BLOB都在RAM中缓冲。

 

Property Name Default Description
deserializer The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
deserializer.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request

6 tail 目录源

注意,此源代码是做为预览功能提供的。它不能在Windows上运行。 

监视指定的文件,并在检测到附加到每一个文件的新行以后几乎实时跟踪它们。若是正在写入新行,该源代码将重试读取它们,等待写入完成。

这个源是可靠的,不会错过数据,即便当尾文件滚动。它按期以JSON格式将每一个文件的最后一个读位置写到给定的位置文件上。若是flume因为某种缘由中止或停机,它能够将指定位置读取的数据写入现有位置文件中(注:从指定的断开位置读positionFile参数,写入到上次的文件中)。

在其余用例中,这个源文件还能够开始跟踪使用给定位置文件的每一个文件的任意位置。当指定路径上没有位置文件时,默认状况下它将从每一个文件的第一行开始尾随。

文件将按照修改时间的顺序被使用。修改时间最长的文件将首先被使用。

此源不重命名、删除或对被跟踪的文件作任何修改。目前,该源代码不支持跟踪二进制文件。它逐行读取文本文件。

 

Property Name Default Description
channels  
type The component type name, needs to be TAILDIR.
filegroups Space-separated list of file groups. Each file group indicates a set of files to be tailed.
filegroups.<filegroupName> Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only.
positionFile ~/.flume/taildir_position.json File in JSON format to record the inode, the absolute path and the last position of each tailing file.
headers.<filegroupName>.<headerKey> Header value which is the set with header key. Multiple headers can be specified for one file group.
byteOffsetHeader false Whether to add the byte offset of a tailed line to a header called ‘byteoffset’.
skipToEnd false Whether to skip the position to EOF in the case of files not written on the position file.
idleTimeout 120000 Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it.
writePosInterval 3000 Interval time (ms) to write the last position of each file on the position file.
batchSize 100 Max number of lines to read and send to the channel at a time. Using the default is usually fine.
backoffSleepIncrement 1000 The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.
maxBackoffSleep 5000 The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.
cachePatternMatching true

Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files.

Caching the list of matching files can improve performance. The order in which files are consumed will also be cached.

Requires that the file system keeps track of modification times with at least a 1-second granularity.

fileHeader false Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.

agent a1 示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /var/log/flume/taildir_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /var/log/test1/example.log a1.sources.r1.headers.f1.headerKey1 = value1 a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 a1.sources.r1.fileHeader = true

7 Twitter 1% firehose Source(实验)

警告:这个源是高度实验性的,可能会在不一样版本的Flume之间发生变化。使用风险自负。

实验源,经过流API链接到1%的示例twitter firehose,持续下载tweet,将其转换为Avro格式,并将Avro事件发送到下游Flume sink。须要使用者和访问令牌以及Twitter开发人员账户的秘密。必须属性以粗体显示。

Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.twitter.TwitterSource
consumerKey OAuth consumer key
consumerSecret OAuth consumer secret
accessToken OAuth access token
accessTokenSecret OAuth token secret
maxBatchSize 1000 Maximum number of twitter messages to put in a single batch
maxBatchDurationMillis 1000 Maximum number of milliseconds to wait before closing a batch

 agent a1 示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource a1.sources.r1.channels = c1 a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET a1.sources.r1.maxBatchSize = 10 a1.sources.r1.maxBatchDurationMillis = 200

8 Kafka Source

Kafka源是一个Apache Kafka消费者,它从Kafka主题读取消息。若是您有多个Kafka源在运行,您能够将它们配置为同一个消费组,这样每一个消费组都将为主题读取一组唯一的分区。

Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers List of brokers in the Kafka cluster used by the source
kafka.consumer.group.id flume

Unique identified of consumer group. Setting the same id in multiple sources or agents indicates that they are

part of the same consumer group

kafka.topics Comma-separated list of topics the kafka consumer will read messages from.
kafka.topics.regex

Regex that defines set of topics the source is subscribed on. This property has higher priority than kafka.topics 

and overrides kafka.topics if exists.

batchSize 1000 Maximum number of messages written to Channel in one batch
batchDurationMillis 1000

Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of

size and time will be reached.

backoffSleepIncrement 1000

Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty.

Wait period will reduce aggressive pinging of an empty Kafka Topic.

One second is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors.

maxBackoffSleep 5000

Maximum wait time that is triggered when a Kafka Topic appears to be empty.

Five seconds is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors.

useFlumeEventFormat false

By default events are taken as bytes from the Kafka topic directly into the event body.

Set to true to read events as the Flume Avro binary format. Used in conjunction with

the same property on the KafkaSink or with the parseAsFlumeEvent property on the Kafka

Channel this will preserve any Flume headers sent on the producing side.

setTopicHeader true When set to true, stores the topic of the retrieved message into a header, defined by the topicHeaderproperty.
topicHeader topic

Defines the name of the header in which to store the name of the topic the message was received from,

if the setTopicHeader property is set to true. Care should be taken if combining with the Kafka Sink

 topicHeader property so as to avoid sending the message back to the same topic in a loop.

migrateZookeeperOffsets true

When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.

This should be true to support seamless Kafka client migration from older versions of Flume.

Once migrated this can be set to false, though that should generally not be required.

If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset

defines how offsets are handled. Check Kafka documentation for details

kafka.consumer.security.protocol PLAINTEXT

Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security.

See below for additional info on secure setup.

more consumer security props  

If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties

that need to be set on consumer.

Other Kafka Consumer Properties

These properties are used to configure the Kafka Consumer. Any consumer property supported by Kafka can be used.

The only requirement is to prepend the property name with the prefixkafka.consumer.

For example: kafka.consumer.auto.offset.reset

注意:Kafka源覆盖了两个Kafka使用者参数:auto.commit.enable被源设置为“false”,而且每批都提交了。Kafka源保证至少一次消息检索策略。当源启动时,能够显示副本。Kafka源代码还为key.deserializer(org.apache. Kafka .common. serialize . stringserializer)和value.deserializer(org.apache. Kafka .common. serialize . bytearrayserializer)提供了默认值。不建议修改这些参数。

弃用属性:

Property Name Default Description
topic Use kafka.topics
groupId flume Use kafka.consumer.group.id
zookeeperConnect

Is no longer supported by kafka consumer client since 0.9.x.

Use kafka.bootstrap.servers to establish connection with kafka cluster

 

以逗号分隔的topic 列表订阅topic的示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1 tier1.sources.source1.batchSize = 5000 tier1.sources.source1.batchDurationMillis = 2000 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics = test1, test2 tier1.sources.source1.kafka.consumer.group.id = custom.g.id

regex订阅topic的示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$ # the default kafka.consumer.group.id=flume is used

安全和kafka source:

Flume和Kafka之间的通讯通道支持安全认证和数据加密。对于安全身份验证,可使用Kafka版本0.9.0中的SASL/GSSAPI (Kerberos V5)或SSL(尽管参数名为SSL,但实际的协议是TLS实现)。

到目前为止,数据加密仅由SSL/TLS提供。

设置kafka.consumer.security.protocol 符合下列任何一项价值意味着:

  SASL_PLAINTEXT - 没有数据加密的Kerberos或明文身份验证
  SASL_SSL - 带有数据加密的Kerberos或纯文自己份验证
  SSL - TLS加密,具备可选的身份验证。

警告:启用SSL时会致使性能降低,其程度取决于CPU类型和JVM实现。参考文献:Kafka安全概述和用于跟踪这个问题的jira: Kafka -2561 

TLS and Kafka Source:

请阅读配置Kafka客户端SSL中描述的步骤,以了解用于微调的其余配置设置,例如如下任何一种:安全提供程序、密码套件、启用的协议、信任存储或密钥存储类型。

使用服务器端身份验证和数据加密的示例配置:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sources.source1.kafka.topics = mytopic a1.sources.source1.kafka.consumer.group.id = flume-consumer a1.sources.source1.kafka.consumer.security.protocol = SSL a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

注意:默认状况下属性是ssl.endpoint.identification.algorithm 没有定义,所以没有执行主机名验证。为了启用主机名验证,请设置如下属性:

a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS

一旦启用,客户端将针对如下两个字段之一验证服务器的彻底限定域名(FQDN):

  1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

若是还须要客户端身份验证,那么应该向Flume agent配置添加如下内容。每一个Flume代理必须拥有本身的客户端证书,这些证书必须由Kafka brokers单独或经过其签名链进行信任。常见的示例是经过一个根CA对每一个客户端证书进行签名,而这个根CA证书又受到Kafka代理的信任。

a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>

若是密钥存储和密钥使用不一样的密码保护,则使用ssl.key.password 属性将为两个使用者密钥存储库提供所需的额外机密:

a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>

Kerberos and Kafka Source:

要将Kafka源与Kerberos保护的Kafka集群一块儿使用,请设置consumer.security.protocol为上面使用者指出的属性。与Kafka brokers一块儿使用的Kerberos keytab和主体在JAAS文件的“KafkaClient”部分中指定。“客户端”部分描述了须要时的Zookeeper链接。有关JAAS文件内容的信息,请参见Kafka文档。能够经过flume-env.sh中的JAVA_OPTS指定这个JAAS文件的位置,也能够选择指定系统范围内的kerberos配置:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用SASL_PLAINTEXT的安全配置示例:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sources.source1.kafka.topics = mytopic a1.sources.source1.kafka.consumer.group.id = flume-consumer a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka

使用SASL_SSL的安全配置示例:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sources.source1.kafka.topics = mytopic a1.sources.source1.kafka.consumer.group.id = flume-consumer a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

JAAS文件示例。有关其内容的参考,请参阅SASL配置的Kafka文档中所需身份验证机制(GSSAPI/PLAIN)的客户端配置部分。因为Kafka源也能够链接到Zookeeper进行偏移迁移,所以“Client”部分也被添加到这个示例中。除非您须要偏移迁移,或者对于其余安全组件须要此部分,不然不须要这样作。另外,请确保Flume进程的操做系统用户具备jaas和keytab文件上的读权限。

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true storeKey=true keyTab="/path/to/keytabs/flume.keytab" principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/keytabs/flume.keytab" principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; };

 

9 NetCat TCP Source

一个相似于netcat的source,它监听给定端口并将每一行文本转换为一个事件。相似于nc -k -l[ip][port]。换句话说,它打开指定的端口并侦听数据。预期提供的数据是换行分隔的文本。每一行文本都被转换为Flume事件,并经过链接的channel发送。

必须属性以粗体显示。

 

Property Name Default Description
channels  
type The component type name, needs to be netcat
bind Host name or IP address to bind to
port Port # to bind to
max-line-length 512 Max line length per event body (in bytes)
ack-every-event true Respond with an “OK” for every event received
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*  

agent a1 示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 a1.sources.r1.channels = c1

 

10  NetCat UDP Source

根据原始Netcat (TCP)源,该源在给定端口上侦听,并将每一行文本转换为一个事件,并经过链接的通道发送。相似于nc -u -k -l[host][端port]。

必须属性以粗体显示。

 

Property Name Default Description
channels  
type The component type name, needs to be netcatudp
bind Host name or IP address to bind to
port Port # to bind to
remoteAddressHeader  
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

 agent a1 示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 a1.sources.r1.channels = c1

 

11 Sequence Generator Source

一个简单的序列生成器,它使用计数器连续生成事件,计数器从0开始,递增1,并在totalEvents中止。没法将事件发送到channel时重试。主要用于测试。在重试期间,它保持重试消息的主体与之前相同,这样,在目的地重复数据删除以后,惟一事件的数量预期将等于指定的totalEvents。必须属性以粗体显示。

Property Name Default Description
channels  
type The component type name, needs to be seq
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    
batchSize 1 Number of events to attempt to process per request loop.
totalEvents Long.MAX_VALUE Number of unique events sent by the source.

agent a1 示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1

12  Syslog Sources

读取syslog数据并生成Flume事件。UDP源将整个消息视为单个事件。TCP源为用换行符(' n ')分隔的每一个字符串建立一个新事件。

必须属性以粗体显示。

12.1  Syslog TCP Source

原始的、可靠的syslog TCP源.

Property Name Default Description
channels  
type The component type name, needs to be syslogtcp
host Host name or IP address to bind to
port Port # to bind to
eventSize 2500 Maximum size of a single event line, in bytes
keepFields none

Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event.

A spaced separated list of fields to include is allowed as well. Currently,

the following fields can be included: priority, version, timestamp, hostname.

The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’.

selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

例如,agent a1 syslog TCP source

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

12.2 Multiport Syslog TCP Source

这是一个更新、更快、支持多端口的Syslog TCP源版本。请注意,端口配置设置已经替换了端口。多端口功能意味着它能够以一种有效的方式同时监听多个端口。这个源代码使用Apache Mina库来实现这一点。提供对RFC-3164和许多常见的RFC-5424格式消息的支持。还提供按端口配置所使用的字符集的功能。

Property Name Default Description
channels  
type The component type name, needs to be multiport_syslogtcp
host Host name or IP address to bind to.
ports Space-separated list (one or more) of ports to bind to.
eventSize 2500 Maximum size of a single event line, in bytes.
keepFields none

Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event.

A spaced separated list of fields to include is allowed as well. Currently,

the following fields can be included: priority, version, timestamp, hostname.

The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’.

portHeader

If specified, the port number will be stored in the header of each event using the header name specified here.

This allows for interceptors and channel selectors to customize routing logic based on the incoming port.

charset.default UTF-8 Default character set used while parsing syslog events into strings.
charset.port.<port> Character set is configurable on a per-port basis.
batchSize 100 Maximum number of events to attempt to process per request loop. Using the default is usually fine.
readBufferSize 1024 Size of the internal Mina read buffer. Provided for performance tuning. Using the default is usually fine.
numProcessors (auto-detected)

Number of processors available on the system for use while processing messages.

Default is to auto-detect # of CPUs using the Java Runtime API.

Mina will spawn 2 request-processing threads per detected CPU, which is often reasonable.

selector.type replicating replicating, multiplexing, or custom
selector.* Depends on the selector.type value
interceptors Space-separated list of interceptors.
interceptors.*    

For example, a multiport syslog TCP source for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port

12.3 Syslog UDP Source

 

Property Name Default Description
channels  
type The component type name, needs to be syslogudp
host Host name or IP address to bind to
port Port # to bind to
keepFields false Setting this to true will preserve the Priority, Timestamp and Hostname in the body of the event.
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

For example, a syslog UDP source for agent named a1: 

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

13 HTTP source

经过HTTP POST和GET接受Flume事件的源。GET应该只用于实验。HTTP请求经过必须实现HTTPSourceHandler接口的可插入“处理程序”转换为flume事件。这个处理程序接受HttpServletRequest并返回一个flume事件列表。从一个Http请求处理的全部事件都提交给一个事务中的通道,从而提升了文件通道等通道的效率。若是处理程序抛出异常,该源将返回400的HTTP状态。若是channel已满,或者源没法向channel追加事件,源将返回一个HTTP 503—暂时不可用状态。

在一个post请求中发送的全部事件都被视为一个批处理,并在一个事务中插入到channel中。

Property Name Default Description
type   The component type name, needs to be http
port The port the source should bind to.
bind 0.0.0.0 The hostname or IP address to listen on
handler org.apache.flume.source.http.JSONHandler The FQCN of the handler class.
handler.* Config parameters for the handler
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    
enableSSL false Set the property true, to enable SSL. HTTP Source does not support SSLv3.
excludeProtocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 is always excluded.
keystore   Location of the keystore includng keystore file name
keystorePassword Keystore password

例如, a http source for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props

13.1 JSONHandler

提供了一个开箱即用的处理程序,它能够处理JSON格式表示的事件,并支持UTF-八、UTF-16和UTF-32字符集。处理程序接受事件数组(即便只有一个事件,也必须以数组的形式发送事件),并根据请求中指定的编码将其转换为Flume事件。若是没有指定编码,则假定为UTF-8。JSON处理程序支持UTF-八、UTF-16和UTF-32。事件表示以下:

[{
  "headers" : {
             "timestamp" : "434324343",
             "host" : "random_host.example.com"
             },
  "body" : "random_body"
  },
  {
  "headers" : {
             "namenode" : "namenode.example.com",
             "datanode" : "random_datanode.example.com"
             },
  "body" : "really_random_body"
  }]

要设置字符集,请求必须具备指定为application/json的内容类型;charset=UTF-8(根据须要将UTF-8替换为UTF-16或UTF-32)。

按照此处理程序所指望的格式建立事件的一种方法是使用Flume SDK中提供的JSONEvent,并使用谷歌Gson使用Gson#fromJson(对象、类型)方法建立JSON字符串。传递给事件列表的方法的第二个参数的类型令牌能够经过如下方式建立:

Type type = new TypeToken<List<JSONEvent>>() {}.getType();

13.2  BlobHandler

默认状况下,HTTPSource将JSON输入拆分为Flume事件。另外一种选择是,BlobHandler是HTTPSource的处理程序,它返回一个事件,该事件包含请求参数以及随此请求上载的二进制大对象(BLOB)。例如PDF或JPG文件。注意,这种方法不适用于很是大的对象,由于它在RAM中缓冲整个BLOB。

Property Name Default Description
handler The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler
handler.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request

14 Stress Source 

StressSource是一个内部的负载产生源实现,它对压力测试很是有用。它容许用户使用空头配置事件有效负载的大小。用户能够配置要发送的事件总数以及要交付的成功事件的最大数量。

必须属性以粗体显示。

Property Name Default Description
type The component type name, needs to be org.apache.flume.source.StressSource
size 500 Payload size of each Event. Unit:byte
maxTotalEvents -1 Maximum number of Events to be sent
maxSuccessfulEvents -1 Maximum number of Events successfully sent
batchSize 1 Number of Events to be sent in one batch

 agent a1 示例:

a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1

15 Legacy Sources

legacy sources容许使用Flume1.x agent 接收代理0.9.4 agent 发送的事件。它接受Flume 0.9.4格式的事件,将它们转换为Flume 1.0格式,并将它们存储在链接的channel中。时间戳、pri、主机、nanos等0.9.4事件属性被转换为1。x事件头属性。legacy sources同时支持Avro和Thrift RPC链接。要在两个Flume版本之间使用此桥接,您须要启动一个Flume 1.x 具备avroLegacy或thriftLegacy源的agent , 0.9.4 agent应该让agent sink指向1.x agent的主机/端口。

注意:Flume的可靠性语义1.x与水槽0.9.x不一样。0.9.x Flume agent的E2E或DFO模式不支持 legacy source, 仅支持0.9.x 模式是最好,虽然能够设置为1.x,当事件被保存到flume1.x的channel中时,legacy source流将适用于它们。(注:翻译不清楚,无力)

必须属性以粗体显示。

 15.1  Avro Legacy Source

Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource
host The hostname or IP address to bind to
port The port # to listen on
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

 agent a1 示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

15.2  Thrift Legacy Source

Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.thriftLegacy.ThriftLegacySource
host The hostname or IP address to bind to
port The port # to listen on
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

 agent a1 示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

16 自定义source

自定义源是您本身对源接口的实现。启动Flume代理时,必须将自定义源的类及其依赖项包含在代理的类路径中。定制源的类型是它的FQCN。

 

Property Name Default Description
channels  
type The component type name, needs to be your FQCN
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

 agent a1示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1

17 Scribe Source

Scribe是另外一种获取系统。Flume采用现有的Scribe 获取系统,应在Thrift的基础上使用ScribeSource,并采用兼容的传输协议。关于Scribe的部署,请遵循Facebook的指南。必须属性以粗体显示。

 

Property Name Default Description
type The component type name, needs to be org.apache.flume.source.scribe.ScribeSource
port 1499 Port that Scribe should be connected
maxReadBufferBytes 16384000 Thrift Default FrameBuffer Size
workerThreads 5 Handing threads number in Thrift
selector.type    
selector.*

 agent a1 示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1

 

 

翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guide

篇幅限制,分为如下5篇:

【翻译】Flume 1.8.0 User Guide(用户指南)

【翻译】Flume 1.8.0 User Guide(用户指南) source

【翻译】Flume 1.8.0 User Guide(用户指南) Sink

【翻译】Flume 1.8.0 User Guide(用户指南) Channel

【翻译】Flume 1.8.0 User Guide(用户指南) Processors

相关文章
相关标签/搜索