翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guidehtml
篇幅限制,分为如下5篇:java
【翻译】Flume 1.8.0 User Guide(用户指南)node
【翻译】Flume 1.8.0 User Guide(用户指南) sourcegit
【翻译】Flume 1.8.0 User Guide(用户指南) Sinkgithub
【翻译】Flume 1.8.0 User Guide(用户指南) Channel正则表达式
【翻译】Flume 1.8.0 User Guide(用户指南) Processors数据库
接收器组容许用户将多个接收器分组到一个实体中。接收器处理器可用于在组内的全部接收器上提供负载平衡功能,或在出现暂时故障时实现从一个接收器到另外一个接收器的故障转移。express
必须属性以粗体显示。apache
Property Name | Default | Description |
---|---|---|
sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to be default, failover or load_balance |
Example for agent named a1:json
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance
默认接收器处理器只接受单个接收器。用户没必要为单个接收器建立处理器(接收器组)。相反,用户能够遵循本用户指南中前面解释的源-通道-接收器模式。
故障转移接收器处理器维护一个优先级较高的接收器列表,确保只要有一个可用的接收器,就会处理(交付)事件。
故障转移机制的工做方式是将失败的接收器降级到池中,在池中为它们分配一个冷却期,在重试以前随着顺序故障的增长而增长。一旦接收器成功发送事件,它将被恢复到活动池。接收器有一个与之相关的优先级,越大,优先级越高。若是一个接收器在发送事件时失败,那么下一个具备最高优先级的接收器将在下一次发送事件时尝试。例如,优先级为100的接收器在优先级为80的接收器以前被激活。若是没有指定优先级,则根据配置中指定接收器的顺序肯定thr优先级。
若要配置,请设置接收器组处理器进行故障转移,并为全部单个接收器设置优先级。全部指定的优先级必须是惟一的。此外,可使用maxpenalty属性设置故障转移时间的上限(以毫秒为单位)。
必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to be failover |
processor.priority.<sinkName> | – | Priority value. <sinkName> must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority |
processor.maxpenalty | 30000 | The maximum backoff period for the failed Sink (in millis) |
Example for agent named a1:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
负载平衡接收器处理器提供了跨多个接收器的负载平衡流的能力。它维护一个活动接收器的索引列表,其中必须分布负载。实现支持经过round_robin或随机选择机制分配负载。选择机制的选择默认为round_robin类型,可是能够经过配置覆盖。经过继承AbstractSinkSelector的自定义类支持自定义选择机制。
调用时,此选择器使用其配置的选择机制选择下一个接收器并调用它。对于round_robin和random,若是所选的接收器没法交付事件,处理器将经过其配置的选择机制选择下一个可用的接收器。此实现不会将失败的接收器列入黑名单,而是继续乐观地尝试全部可用的接收器。若是全部接收器调用都致使失败,则选择器将失败传播到接收器运行器。
若是启用了backoff,接收器处理器将把失败的接收器列入黑名单,在给定的超时中删除它们。当超时结束时,若是接收仍然是无响应的,则以指数方式增长超时,以免在无响应接收上陷入长时间等待。禁用此功能后,在循环中,全部失败的接收器负载将被传递到行中的下一个接收器,所以不会均衡
必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
processor.sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to be load_balance |
processor.backoff | false | Should failed sinks be backed off exponentially. |
processor.selector | round_robin | Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector |
processor.selector.maxTimeOut | 30000 | Used by backoff selectors to limit exponential backoff (in milliseconds) |
Example for agent named a1:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random
目前不支持自定义接收器处理器。
file_roll接收器和hdfs接收器都支持EventSerializer接口。下面提供了带有Flume的eventserializer的详细信息。
别名:text。此拦截器将事件体写入输出流,而不进行任何转换或修改。忽略事件标题。配置选项以下:
Property Name | Default | Description |
---|---|---|
appendNewline | true | Whether a newline will be appended to each event at write time. The default of true assumes that events do not contain newlines, for legacy reasons. |
Example for agent named a1:
a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume a1.sinks.k1.sink.serializer = text a1.sinks.k1.sink.serializer.appendNewline = false
别名:avro_event。
这个拦截器将Flume事件序列化到Avro容器文件中。使用的模式与Avro RPC机制中Flume事件使用的模式相同。
这个序列化器继承了AbstractAvroEventSerializer类。
配置选项以下:
Property Name | Default | Description |
---|---|---|
syncIntervalBytes | 2048000 | Avro sync interval, in approximate bytes. |
compressionCodec | null | Avro compression codec. For supported codecs, see Avro’s CodecFactory docs. |
Example for agent named a1:
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.serializer = avro_event a1.sinks.k1.serializer.compressionCodec = snappy
别名:此序列化器没有别名,必须使用全限定类名类名指定。
这将Flume事件序列化到Avro容器文件中,如“Flume事件”Avro事件序列化器,可是记录模式是可配置的。记录模式能够指定为Flume配置属性,也能够在事件头中传递。
要将记录模式做为Flume配置的一部分传递,请使用下面列出的属性schemaURL。
要在事件标头中传递记录模式,请指定事件标头flume.avro.schema。包含模式或flume.av .schema的json格式表示的文本。一个能够找到模式的url (hdfs:/…支持uri)。
这个序列化器继承了AbstractAvroEventSerializer类。
配置选项以下:
Property Name | Default | Description |
---|---|---|
syncIntervalBytes | 2048000 | Avro sync interval, in approximate bytes. |
compressionCodec | null | Avro compression codec. For supported codecs, see Avro’s CodecFactory docs. |
schemaURL | null | Avro schema URL. Schemas specified in the header ovverride this option. |
Example for agent named a1:
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder a1.sinks.k1.serializer.compressionCodec = snappy a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc
Flume可以修改/删除飞行中的事件。这是在拦截器的帮助下完成的。拦截器是实现org.apache.flume.interceptor.Interceptor接口的类。拦截器能够根据开发人员选择的任何标准修改甚至删除事件。Flume支持拦截器的连接。这能够经过在配置中指定拦截器构建器类名的列表来实现。拦截器在源配置中指定为空格分隔的列表。指定拦截器的顺序就是调用它们的顺序。一个拦截器返回的事件列表传递给链中的下一个拦截器。拦截器能够修改或删除事件。若是拦截器须要删除事件,它只是在返回的列表中不返回该事件。若是要删除全部事件,那么它只返回一个空列表。拦截器是命名组件,下面是一个经过配置建立拦截器的例子:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder a1.sources.r1.interceptors.i1.preserveExisting = false a1.sources.r1.interceptors.i1.hostHeader = hostname a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d a1.sinks.k1.channel = c1
注意,拦截器构建器被传递给type config参数。拦截器自己是可配置的,能够像传递给任何其余可配置组件同样传递配置值。在上面的示例中,首先将事件传递给HostInterceptor,而后将HostInterceptor返回的事件传递给TimestampInterceptor。能够指定彻底限定类名(FQCN)或别名时间戳。若是有多个收集器写入相同的HDFS路径,那么还可使用HostInterceptor。
此拦截器将插入事件标头,即它处理事件的millis时间。这个拦截器插入一个带有键时间戳(或由header属性指定)的消息头,其值是相关的时间戳。若是配置中已有时间戳,则此拦截器能够保留该时间戳。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be timestamp or the FQCN |
header | timestamp | The name of the header in which to place the generated timestamp. |
preserveExisting | false | If the timestamp already exists, should it be preserved - true or false |
Example for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp
这个拦截器插入代理运行的主机的主机名或IP地址。它插入一个带有密钥主机的头或一个已配置密钥,该密钥的值是基于配置的主机名或主机的IP地址。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be host |
preserveExisting | false | If the host header already exists, should it be preserved - true or false |
useIP | true | Use the IP Address if true, else use hostname. |
hostHeader | host | The header key to be used. |
Example for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = host
静态拦截器容许用户向全部事件附加一个具备静态值的静态标题。
当前实现不容许同时指定多个标题。相反,用户能够连接多个静态拦截器,每一个拦截器定义一个静态头。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be static |
preserveExisting | true | If configured header already exists, should it be preserved - true or false |
key | key | Name of header that should be created |
value | value | Static value that should be created |
Example for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = datacenter a1.sources.r1.interceptors.i1.value = NEW_YORK
这个拦截器经过删除一个或多个header来操纵Flume事件header。它能够删除静态定义的头、基于正则表达式的头或列表中的头。若是这些都没有定义,或者没有标题与标准匹配,就不会修改Flume事件。
注意,若是只须要删除一个头,那么经过名称指定它会比其余两个方法提供更好的性能。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be remove_header |
withName | – | Name of the header to remove |
fromList | – | List of headers to remove, separated with the separator specified by fromListSeparator |
fromListSeparator | \s*,\s* | Regular expression used to separate multiple header names in the list specified by fromList. Default is a comma surrounded by any number of whitespace characters |
matching | – | All the headers which names match this regular expression are removed |
这个拦截器在全部被拦截的事件上设置一个统一的惟一标识符。一个示例UUID是b5755073-77a9-43c1-8fa -b7a586fc1b97,它表示128位值。
若是没有事件的应用程序级唯一键可用,能够考虑使用UUIDInterceptor自动为事件分配UUID。当事件进入Flume网络时,为它们分配uuid是很是重要的;也就是说,在第一个Flume source 的流中。这使得在为高可用性和高性能而设计的Flume网络中,面对复制和从新交付时,能够对事件进行后续重复数据删除。若是应用程序级密钥可用,这比自动生成的UUID更可取,由于它使用已知的应用程序级密钥支持数据存储中事件的后续更新和删除。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder |
headerName | id | The name of the Flume header to modify |
preserveExisting | true | If the UUID header already exists, should it be preserved - true or false |
prefix | “” | The prefix string constant to prepend to each generated UUID |
这个拦截器经过一个形态线配置文件过滤事件,该文件定义了一个转换命令链,将记录从一个命令传输到另外一个命令。例如,morphline能够忽略某些事件,或者经过基于正则表达式的模式匹配更改或插入某些事件头部,或者能够经过Apache Tika自动检测并在被截获的事件上设置MIME类型。例如,这种包嗅探能够用于Flume拓扑中基于内容的动态路由。MorphlineInterceptor还能够帮助实现到多个Apache Solr集合的动态路由(例如,对于多租户)。
目前,有一个限制,拦截器的形态线不能为每一个输入事件生成多个输出记录。这个拦截器不是为繁重的ETL处理而设计的——若是你须要的话,能够考虑将ETL处理从Flume源转移到Flume Sink,例如到MorphlineSolrSink。
必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be org.apache.flume.sink.solr. morphline.MorphlineInterceptor$Builder |
morphlineFile | – | The relative or absolute path on the local file system to the morphline configuration file. Example: /etc/flume-ng/conf/morphline.conf |
morphlineId | null | Optional name used to identify a morphline if there are multiple morphlines in a morphline config file |
Sample flume.conf file:
a1.sources.avroSrc.interceptors = morphlineinterceptor a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1
这个拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。也可使用回溯/组捕获。这个拦截器使用与Java Matcher.replaceAll()方法中相同的规则。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be search_replace |
searchPattern | – | The pattern to search for and replace. |
replaceString | – | The replacement string. |
charset | UTF-8 | The charset of the event body. Assumed by default to be UTF-8. |
Example configuration:
a1.sources.avroSrc.interceptors = search-replace a1.sources.avroSrc.interceptors.search-replace.type = search_replace # Remove leading alphanumeric characters in an event body. a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+ a1.sources.avroSrc.interceptors.search-replace.replaceString =
Another example:
a1.sources.avroSrc.interceptors = search-replace a1.sources.avroSrc.interceptors.search-replace.type = search_replace # Use grouping operators to reorder and munge words on a line. a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+) a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1
这个拦截器经过将事件体解释为文本并根据配置的正则表达式匹配文原本选择性地过滤事件。提供的正则表达式可用于包含事件或排除事件。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be regex_filter |
regex | ”.*” | Regular expression for matching against events |
excludeEvents | false | If true, regex determines events to exclude, otherwise regex determines events to include. |
这个拦截器使用指定的正则表达式提取regex匹配组,并将匹配组追加为事件的头部。它还支持可插入的序列化器,用于在将匹配组添加为事件头以前对其进行格式化。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be regex_extractor |
regex | – | Regular expression for matching against events |
serializers | – | Space-separated list of serializers for mapping matches to header names and serializing their values. (See example below) Flume provides built-in support for the following serializers: org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializerorg. apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer |
serializers.<s1>.type | default | Must be default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer), org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer, or the FQCN of a custom class that implements org.apache.flume.interceptor.RegexExtractorInterceptorSerializer |
serializers.<s1>.name | – | |
serializers.* | – | Serializer-specific properties |
序列化器用于将匹配映射到标题名称和格式化的标题值;默认状况下,您只须要指定标题名称,并使用默认的org.apache.flume.interceptor. regexextractorinterceptorpassthrough序列化器。这个序列化器只是将匹配映射到指定的头名称,并在regex提取值时传递该值。您可使用彻底限定类名(FQCN)将自定义序列化器实现插入提取器中,以按照您喜欢的方式格式化匹配。
若是水槽事件体包含1:2:3.4foobar5,则使用如下配置
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) a1.sources.r1.interceptors.i1.serializers = s1 s2 s3 a1.sources.r1.interceptors.i1.serializers.s1.name = one a1.sources.r1.interceptors.i1.serializers.s2.name = two a1.sources.r1.interceptors.i1.serializers.s3.name = three
提取的事件将包含相同的主体,可是添加了如下头部:1 =>1,2 =>2,3 =>3
若是水槽事件体包含2012-10-18 18:47:57,614,则使用一些日志行,并使用如下配置
a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d) a1.sources.r1.interceptors.i1.serializers = s1 a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
提取的事件将包含相同的主体,可是添加了如下标头的时间戳=>1350611220000
Property Name | Default | Description |
---|---|---|
flume.called.from.service | – | If this property is specified then the Flume agent will continue polling for the config file even if the config file is not found at the expected location. Otherwise, the Flume agent will terminate if the config doesn’t exist at the expected location. No property value is needed when setting this property (eg, just specifying -Dflume.called.from.service is enough) |
Flume每30秒按期轮询指定配置文件的更改。若是第一次轮询现有文件,或者自上次轮询以来已有文件的修改日期发生了更改,则Flume代理将从配置文件加载新配置。重命名或移动文件不会改变其修改时间。当Flume代理轮询不存在的文件时,会发生两种状况之一:1。当代理第一次轮询不存在的配置文件时,代理将根据flume.call .from.service属性进行操做。若是设置了属性,那么代理将继续轮询(始终在同一时间段—每30秒)。若是属性未设置,则代理将当即终止。2. 当代理轮询一个不存在的配置文件,而且这不是该文件第一次轮询时,那么代理在此轮询期间不进行任何配置更改。代理继续轮询而不是终止。
将Log4j事件追加到flume代理的avro源。使用这个appender的客户机必须在类路径中包含flume-ng-sdk(例如,flume-ng-sdk-1.8.0.jar)。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
Hostname | – | The hostname on which a remote Flume agent is running with an avro source. |
Port | – | The port at which the remote Flume agent’s avro source is listening. |
UnsafeMode | false | If true, the appender will not throw exceptions on failure to send the events. |
AvroReflectionEnabled | false | Use Avro Reflection to serialize Log4j events. (Do not use when users log strings) |
AvroSchemaUrl | – | A URL from which the Avro schema can be retrieved. |
Sample log4j.properties file:
#... log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = example.com log4j.appender.flume.Port = 41414 log4j.appender.flume.UnsafeMode = true # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
默认状况下,经过调用toString()或使用Log4j布局(若是指定的话)将每一个事件转换为一个字符串。
若是事件是org.apache.avro.generic.GenericRecord的实例。 org.apache.avro.specific.SpecificRecord, 若是属性AvroReflectionEnabled设置为true,则使用Avro序列化对事件进行序列化。
使用其Avro模式序列化每一个事件的效率很低,所以最好提供一个模式URL,下游接收器(一般是HDFS接收器)能够从中检索模式。若是没有指定AvroSchemaUrl,则模式将做为Flume标头包含。
示例log4j。配置为使用Avro序列化的属性文件:
#... log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = example.com log4j.appender.flume.Port = 41414 log4j.appender.flume.AvroReflectionEnabled = true log4j.appender.flume.AvroSchemaUrl = hdfs://namenode/path/to/schema.avsc # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
将Log4j事件追加到flume代理的avro源列表中。使用这个appender的客户机必须在类路径中包含flume-ng-sdk(例如,flume-ng-sdk-1.8.0.jar)。这个附加程序支持执行负载平衡的循环和随机方案。它还支持可配置的backoff超时,以便临时从所需的主机属性集中删除down代理。
Property Name | Default | Description |
---|---|---|
Hosts | – | A space-separated list of host:port at which Flume (through an AvroSource) is listening for events |
Selector | ROUND_ROBIN | Selection mechanism. Must be either ROUND_ROBIN, RANDOM or custom FQDN to class that inherits from LoadBalancingSelector. |
MaxBackoff | – | A long value representing the maximum amount of time in milliseconds the Load balancing client will backoff from a node that has failed to consume an event. Defaults to no backoff |
UnsafeMode | false | If true, the appender will not throw exceptions on failure to send the events. |
AvroReflectionEnabled | false | Use Avro Reflection to serialize Log4j events. |
AvroSchemaUrl | – | A URL from which the Avro schema can be retrieved. |
Sample log4j.properties file configured using defaults:
#... log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.out2.Hosts = localhost:25430 localhost:25431 # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
Sample log4j.properties file configured using RANDOM load balancing:
#... log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.out2.Hosts = localhost:25430 localhost:25431 log4j.appender.out2.Selector = RANDOM # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
Sample log4j.properties file configured using backoff:
#... log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432 log4j.appender.out2.Selector = ROUND_ROBIN log4j.appender.out2.MaxBackoff = 30000 # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
HDFS接收器、HBase接收器、Thrift source、Thrift接收器和Kite数据集接收器都支持Kerberos身份验证。有关配置与kerberos相关的选项,请参阅相应的部分。
Flume代理将做为一个主体对kerberos KDC进行身份验证,须要kerberos身份验证的不一样组件将使用这个主体。为Thrift source、Thrift sink、HDFS sink、HBase sink和DataSet sink配置的principal和keytab应该相同,不然组件将没法启动。
Flume的监测工做仍在进行中, 变化常常发生。几个Flume组件向JMX平台MBean服务器报告指标。可使用Jconsole查询这些指标。
能够经过使用flume-env在JAVA_OPTS环境变量中指定JMX参数来启用JMX报告。就像
export JAVA_OPTS=”-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445
-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false”
注意:上面的示例禁用安全性。要启用安全性,请参阅http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html
Flume还能够向Ganglia 3或Ganglia 3.1 metanode报告这些指标。要向Ganglia报告指标,必须使用这种支持启动flume代理。启动Flume代理时,必须将如下参数做为系统属性传递给Flume .monitoring.,可在flume-env.sh中指定:
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be ganglia |
hosts | – | Comma-separated list of hostname:port of Ganglia servers |
pollFrequency | 60 | Time, in seconds, between consecutive reporting to Ganglia server |
isGanglia3 | false | Ganglia server version is 3. By default, Flume sends in Ganglia 3.1 format |
咱们可使用Ganglia支持启动Flume,以下所示:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
Flume还能够以JSON格式报告指标。要启用JSON格式的报表,Flume在一个可配置端口上驻留一个Web服务器。Flume以如下JSON格式报告指标:
{ "typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"}, "typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"} }
下面是一个例子:
{ "CHANNEL.fileChannel":{"EventPutSuccessCount":"468085", "Type":"CHANNEL", "StopTime":"0", "EventPutAttemptCount":"468086", "ChannelSize":"233428", "StartTime":"1344882233070", "EventTakeSuccessCount":"458200", "ChannelCapacity":"600000", "EventTakeAttemptCount":"458288"}, "CHANNEL.memChannel":{"EventPutSuccessCount":"22948908", "Type":"CHANNEL", "StopTime":"0", "EventPutAttemptCount":"22948908", "ChannelSize":"5", "StartTime":"1344882209413", "EventTakeSuccessCount":"22948900", "ChannelCapacity":"100", "EventTakeAttemptCount":"22948908"} }
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be http |
port | 41414 | The port to start the server on. |
咱们可使用JSON报表支持启动Flume,以下所示:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
而后,Metrics将在http://<hostname>:<port>/metrics网页上提供。定制组件能够报告上面Ganglia部分中提到的指标。
经过编写执行报告的服务器,能够向其余系统报告指标。任何报告类都必须实现接口org.apache.flume.instrument . monitorservice。此类类的使用方式与GangliaServer用于报告的方式相同。他们能够轮询平台mbean服务器以轮询mbean以得到度量。例如,若是一个名为httpre的HTTP监控服务可使用以下方式:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be FQCN |
任何自定义flume组件都应该继承自org.apache.flume.instrumentation.MonitoredCounterGroup类。而后类应该为它公开的每一个度量提供getter方法。参见下面的代码。MonitoredCounterGroup须要一个属性列表,该类公开这些属性的指标。到目前为止,该类只支持将度量做为长值公开。
public class SinkCounter extends MonitoredCounterGroup implements SinkCounterMBean { private static final String COUNTER_CONNECTION_CREATED = "sink.connection.creation.count"; private static final String COUNTER_CONNECTION_CLOSED = "sink.connection.closed.count"; private static final String COUNTER_CONNECTION_FAILED = "sink.connection.failed.count"; private static final String COUNTER_BATCH_EMPTY = "sink.batch.empty"; private static final String COUNTER_BATCH_UNDERFLOW = "sink.batch.underflow"; private static final String COUNTER_BATCH_COMPLETE = "sink.batch.complete"; private static final String COUNTER_EVENT_DRAIN_ATTEMPT = "sink.event.drain.attempt"; private static final String COUNTER_EVENT_DRAIN_SUCCESS = "sink.event.drain.sucess"; private static final String[] ATTRIBUTES = { COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED, COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY, COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE, COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS }; public SinkCounter(String name) { super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES); } @Override public long getConnectionCreatedCount() { return get(COUNTER_CONNECTION_CREATED); } public long incrementConnectionCreatedCount() { return increment(COUNTER_CONNECTION_CREATED); } }
文件通道完整性工具验证文件通道中单个事件的完整性,并删除损坏的事件。
工具能够运行以下:
$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir
其中datadir是要验证的数据目录的逗号分隔列表。
如下是可用的选项
Option Name | Description |
---|---|
h/help | Displays help |
l/dataDirs | Comma-separated list of data directories which the tool must verify |
事件验证器工具可用于以特定于应用程序的方式验证文件通道事件。该工具对每一个事件应用用户提供程序验证登陆,并删除不符合逻辑的事件。
工具能够运行以下:
$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir -e org.apache.flume.MyEventValidator -DmaxSize 2000
其中datadir是要验证的数据目录的逗号分隔列表。
如下是可用的选项
Option Name | Description |
---|---|
h/help | Displays help |
l/dataDirs | Comma-separated list of data directories which the tool must verify |
e/eventValidator | Fully Qualified Name of Event Validator Implementation. The jar must be on Flume classpath |
事件验证器实现必须实现EventValidator接口。建议不要从实现中抛出任何异常,由于它们被视为无效事件。其余参数能够经过-D选项传递给EventValitor实现。
让咱们看一个简单的基于大小的事件验证器示例,它将拒绝大于指定的最大大小的事件。
public static class MyEventValidator implements EventValidator { private int value = 0; private MyEventValidator(int val) { value = val; } @Override public boolean validateEvent(Event event) { return event.getBody() <= value; } public static class Builder implements EventValidator.Builder { private int sizeValidator = 0; @Override public EventValidator build() { return new DummyEventVerifier(sizeValidator); } @Override public void configure(Context context) { binaryValidator = context.getInteger("maxSize"); } } }
Flume很是灵活,容许大量可能的部署场景。若是您计划在大型生产部署中使用Flume,那么明智的作法是花一些时间考虑如何用Flume拓扑来表示问题。本节将介绍一些注意事项。
若是您须要将文本日志数据导入Hadoop/HDFS中,那么Flume正好适合您的问题,彻底中止。对于其余用例,这里有一些指导方针:
Flume的设计目的是在相对稳定、潜在复杂的拓扑结构上传输和摄取常规生成的事件数据。“事件数据”的概念定义很是普遍。对Flume来讲,事件只是一个普通的字节blob。对于事件的大小有一些限制—例如,它不能大于您能够存储在内存或单个机器上的磁盘上的内容—可是在实践中,flume事件能够是从文本日志条目到图像文件的全部内容。事件的关键属性是以连续的流方式生成的。若是您的数据不是按期生成的(例如,您正在尝试将单个批量数据加载到Hadoop集群中),那么Flume仍然能够工做,可是对于您的状况来讲,这可能有些过头了。Flume喜欢相对稳定的拓扑结构。您的拓扑不须要是不可变的,由于Flume能够在不丢失数据的状况下处理拓扑中的更改,还能够容忍因为故障转移或供应而按期进行从新配置。若是您天天都尝试更改拓扑,那么它可能不会很好地工做,由于从新配置须要一些思考和开销。
Flume流量的可靠性取决于几个因素。经过调整这些因素,您能够经过Flume实现普遍的可靠性选项。
你使用什么类型的频道?Flume既有持久通道(将数据持久化到磁盘上的通道),也有非持久通道(若是机器发生故障将丢失数据的通道)。持久通道使用基于磁盘的存储,存储在此类通道中的数据将在机器重启或与磁盘无关的故障之间持续存在。
是否为工做负载提供了足够的通道。Flume中的通道在不一样的跃点上充当缓冲器。这些缓冲器的容量是固定的,一旦容量满了,就会对流中较早的点产生反压力。若是这种压力传播到流的源头,水槽将不可用,可能会丢失数据。
是否使用冗余拓扑。Flume让您能够跨冗余拓扑复制流。这能够提供一个很是容易的容错源,克服磁盘或机器故障。
考虑Flume拓扑中的可靠性的最佳方法是考虑各类故障场景及其结果。若是磁盘出现故障怎么办?若是机器故障了怎么办?若是你的终端接收器(如HDFS)降低一段时间,你有背压,会发生什么?可能的设计空间很大,可是您须要问的基本问题却不多。
设计Flume拓扑的第一步是枚举数据的全部源和目标(终端接收器)。这些将定义拓扑的边缘点。接下来要考虑的是是否引入中间聚合层或事件路由。若是您正在从大量源中收集数据,为了简化在终端接收器上的摄取,聚合这些数据是颇有帮助的。聚合层还能够充当缓冲区,消除源的突发性或汇聚处的不可用性。若是您在不一样位置之间路由数据,您可能还但愿在不一样的点上分割流:这将建立子拓扑,这些拓扑自己可能包含聚合点。
一旦您了解了拓扑的外观,下一个问题就是须要多少硬件和网络容量。首先要量化生成的数据量。这并不老是一项简单的任务!大多数数据流都是突发性的(例如,因为昼夜模式),而且可能没法预测。一个好的起点是考虑拓扑的每一层的最大吞吐量,包括每秒的事件数和每秒的字节数。一旦您知道了给定层所需的吞吐量,就能够计算出该层须要多少节点的下限。为了肯定可达到的吞吐量,最好在硬件上使用合成的或取样的事件数据对Flume进行试验。通常来讲,基于磁盘的通道应该获得10 MB/s,基于内存的通道应该获得100 MB/s或更多。可是,根据硬件和操做环境的不一样,性能差异很大。
调整聚合吞吐量的大小能够为每一层所需的节点数量提供一个下限。增长节点的缘由有不少,好比增长冗余和更好地吸取负载中的突发事件。
若是Flume代理宕机,则该代理上承载的全部流都将停止。一旦代理从新启动,则流将恢复。使用文件流通道或其余稳定的通道将恢复处理事件离开。若是代理不能从新启动在相同的硬件上,而后有一个选项将数据库迁移到另外一个硬件和设置一个新的水槽代理,能够保存在数据库恢复处理事件。能够利用数据库HA futures将Flume代理移动到另外一个主机。
目前Flume支持HDFS 0.20.2和0.23
TBD
TBD
TBD
TBD
Component Interface | Type Alias | Implementation Class |
---|---|---|
org.apache.flume.Channel | memory | org.apache.flume.channel.MemoryChannel |
org.apache.flume.Channel | jdbc | org.apache.flume.channel.jdbc.JdbcChannel |
org.apache.flume.Channel | file | org.apache.flume.channel.file.FileChannel |
org.apache.flume.Channel | – | org.apache.flume.channel.PseudoTxnMemoryChannel |
org.apache.flume.Channel | – | org.example.MyChannel |
org.apache.flume.Source | avro | org.apache.flume.source.AvroSource |
org.apache.flume.Source | netcat | org.apache.flume.source.NetcatSource |
org.apache.flume.Source | seq | org.apache.flume.source.SequenceGeneratorSource |
org.apache.flume.Source | exec | org.apache.flume.source.ExecSource |
org.apache.flume.Source | syslogtcp | org.apache.flume.source.SyslogTcpSource |
org.apache.flume.Source | multiport_syslogtcp | org.apache.flume.source.MultiportSyslogTCPSource |
org.apache.flume.Source | syslogudp | org.apache.flume.source.SyslogUDPSource |
org.apache.flume.Source | spooldir | org.apache.flume.source.SpoolDirectorySource |
org.apache.flume.Source | http | org.apache.flume.source.http.HTTPSource |
org.apache.flume.Source | thrift | org.apache.flume.source.ThriftSource |
org.apache.flume.Source | jms | org.apache.flume.source.jms.JMSSource |
org.apache.flume.Source | – | org.apache.flume.source.avroLegacy.AvroLegacySource |
org.apache.flume.Source | – | org.apache.flume.source.thriftLegacy.ThriftLegacySource |
org.apache.flume.Source | – | org.example.MySource |
org.apache.flume.Sink | null | org.apache.flume.sink.NullSink |
org.apache.flume.Sink | logger | org.apache.flume.sink.LoggerSink |
org.apache.flume.Sink | avro | org.apache.flume.sink.AvroSink |
org.apache.flume.Sink | hdfs | org.apache.flume.sink.hdfs.HDFSEventSink |
org.apache.flume.Sink | hbase | org.apache.flume.sink.hbase.HBaseSink |
org.apache.flume.Sink | asynchbase | org.apache.flume.sink.hbase.AsyncHBaseSink |
org.apache.flume.Sink | elasticsearch | org.apache.flume.sink.elasticsearch.ElasticSearchSink |
org.apache.flume.Sink | file_roll | org.apache.flume.sink.RollingFileSink |
org.apache.flume.Sink | irc | org.apache.flume.sink.irc.IRCSink |
org.apache.flume.Sink | thrift | org.apache.flume.sink.ThriftSink |
org.apache.flume.Sink | – | org.example.MySink |
org.apache.flume.ChannelSelector | replicating | org.apache.flume.channel.ReplicatingChannelSelector |
org.apache.flume.ChannelSelector | multiplexing | org.apache.flume.channel.MultiplexingChannelSelector |
org.apache.flume.ChannelSelector | – | org.example.MyChannelSelector |
org.apache.flume.SinkProcessor | default | org.apache.flume.sink.DefaultSinkProcessor |
org.apache.flume.SinkProcessor | failover | org.apache.flume.sink.FailoverSinkProcessor |
org.apache.flume.SinkProcessor | load_balance | org.apache.flume.sink.LoadBalancingSinkProcessor |
org.apache.flume.SinkProcessor | – | |
org.apache.flume.interceptor.Interceptor | timestamp | org.apache.flume.interceptor.TimestampInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | host | org.apache.flume.interceptor.HostInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | static | org.apache.flume.interceptor.StaticInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | regex_filter | org.apache.flume.interceptor.RegexFilteringInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | regex_extractor | org.apache.flume.interceptor.RegexFilteringInterceptor$Builder |
org.apache.flume.channel.file.encryption.KeyProvider$Builder | jceksfile | org.apache.flume.channel.file.encryption.JCEFileKeyProvider |
org.apache.flume.channel.file.encryption.KeyProvider$Builder | – | org.example.MyKeyProvider |
org.apache.flume.channel.file.encryption.CipherProvider | aesctrnopadding | org.apache.flume.channel.file.encryption.AESCTRNoPaddingProvider |
org.apache.flume.channel.file.encryption.CipherProvider | – | org.example.MyCipherProvider |
org.apache.flume.serialization.EventSerializer$Builder | text | org.apache.flume.serialization.BodyTextEventSerializer$Builder |
org.apache.flume.serialization.EventSerializer$Builder | avro_event | org.apache.flume.serialization.FlumeEventAvroEventSerializer$Builder |
org.apache.flume.serialization.EventSerializer$Builder | – | org.example.MyEventSerializer$Builder |
在上面特定于组件的示例中使用了这些别名约定,以保持全部示例中的名称简短且一致。
Alias Name | Alias Type |
---|---|
a | agent |
c | channel |
r | source |
k | sink |
g | sink group |
i | interceptor |
y | key |
h | host |
s | serializer |
翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guide
篇幅限制,分为如下5篇:
【翻译】Flume 1.8.0 User Guide(用户指南)
【翻译】Flume 1.8.0 User Guide(用户指南) source
【翻译】Flume 1.8.0 User Guide(用户指南) Sink
【翻译】Flume 1.8.0 User Guide(用户指南) Channel
【翻译】Flume 1.8.0 User Guide(用户指南) Processors
完结
感谢有道翻译,主要是他的功能,我就是个搬砖的