拦截器做用:拦截器是简单的插件式组件,设置在source和channel之间。source接收到的事件,在写入channel以前,拦截器均可以进行转换或者删除这些事件。每一个拦截器只处理同一个source接收到的事件。能够自定义拦截器。java
经常使用的拦截器:web
时间戳拦截器express
flume中一个最常用的拦截器 ,该拦截器的做用是将时间戳插入到flume的事件报头中。若是不使用任何拦截器,flume接受到的只有message。时间戳拦截器的配置。apache
参数 | 默认值 | 描述 |
type | 类型名称timestamp,也可使用类名的全路径 | |
preserveExisting | false | 若是设置为true,若事件中报头已经存在,不会替换时间戳报头的值 |
source链接到时间戳拦截器的配置:服务器
?app
1
2
3
|
a1.sources.r1.interceptors = timestamp
a1.sources.r1.interceptors.timestamp.type=timestamp
a1.sources.r1.interceptors.timestamp.preserveExisting=
false
|
2. 主机拦截器
less
主机拦截器插入服务器的ip地址或者主机名,agent将这些内容插入到事件的报头中。时间报头中的key使用hostHeader配置,默认是host。主机拦截器的配置
ide
参数 | 默认值 | 描述 |
type | 类型名称host | |
hostHeader | host | 事件投的key |
useIP | true | 若是设置为false,host键插入主机名 |
preserveExisting | false | 若是设置为true,若事件中报头已经存在,不会替换host报头的值 |
source链接到主机拦截器的配置:函数
?oop
1
2
3
4
|
a1.sources.r1.interceptors = host
a1.sources.r1.interceptors.host.type=host
a1.sources.r1.interceptors.host.useIP=
false
a1.sources.r1.interceptors.timestamp.preserveExisting=
true
|
3. 静态拦截器
静态拦截器的做用是将k/v插入到事件的报头中。配置以下
参数 | 默认值 | 描述 |
type |
类型名称static | |
key | key | 事件头的key |
value | value | key对应的value值 |
preserveExisting | true | 若是设置为true,若事件中报头已经存在该key,不会替换value的值 |
source链接到静态拦截器的配置:
1
2
3
4
5
|
a1.sources.r1.interceptors =
static
a1.sources.r1.interceptors.
static
.type=
static
a1.sources.r1.interceptors.
static
.key=logs
a1.sources.r1.interceptors.
static
.value=logFlume
a1.sources.r1.interceptors.
static
.preserveExisting=
false
|
4. 正则过滤拦截器
在日志采集的时候,可能有一些数据是咱们不须要的,这样添加过滤拦截器,能够过滤掉不须要的日志,也能够根据须要收集知足正则条件的日志。
参数 | 默认值 | 描述 |
type |
类型名称REGEX_FILTER | |
regex | .* | 匹配除“\n”以外的任何个字符 |
excludeEvents | false |
默认收集匹配到的事件。若是为true,则会删除匹配到的event,收集未匹配到的。 |
source链接到正则过滤拦截器的配置:
1
2
3
4
|
a1.sources.r1.interceptors = regex
a1.sources.r1.interceptors.regex.type=REGEX_FILTER
a1.sources.r1.interceptors.regex.regex=(rm)|(kill)
a1.sources.r1.interceptors.regex.excludeEvents=
false
|
这样配置的拦截器就只会接收日志消息中带有rm 或者kill的日志。
selector做用:同一个数据源分发到不一样的目的地
官网中selector共有两种类型:
Replicating Channel Selector (default)
Multiplexing Channel Selector
这两种selector的区别是:Replicating 会将source过来的events发往全部channel,而Multiplexing 能够选择该发往哪些channel。对于上面的例子来讲,若是采用Replicating ,那么demo和demo2的日志会同时发往channel1和channel2,这显然是和需求不符的,需求只是让demo的日志发往channel1,而demo2的日志发往channel2。
综上所述,咱们选择Multiplexing Channel Selector。这里咱们有遇到一个棘手的问题,Multiplexing 须要判断header里指定key的值来决定分发到某个具体的channel,咱们如今demo和demo2同时运行在同一个服务器上,若是在不一样的服务器上运行,咱们能够在 source1上加上一个 host 拦截器(上面有介绍),这样能够经过header中的host来判断event该分发给哪一个channel,而这里是在同一个服务器上,由host是区分不出来日志的来源的,咱们必须想办法在header中添加一个key来区分日志的来源。
设想一下,若是header中有一个key:flume.client.log4j.logger.source,咱们经过设置这个key的值,demo设为app1,demo2设为app2,这样咱们就能经过设置:
tier1.sources.source1.channels=channel1 channel2
tier1.sources.source1.selector.type=multiplexing
tier1.sources.source1.selector.header=flume.client.log4j.logger.source
tier1.sources.source1.selector.mapping.app1=channel1
tier1.sources.source1.selector.mapping.app2=channel2
来将不一样项目的的日志输出到不一样的channel了。
可是这个header变量从哪里来呢?
解决方法:
一、修改用到的那个source的源码,应用到client端,不一样的数据类型添加不一样的header值
Event类设计
在Flume中Event是个接口类
public interface Event {
public Map<String, String> getHeaders();
public void setHeaders(Map<String, String> headers);
public byte[] getBody();
public void setBody(byte[] body);
}
在org.apache.flume.event下, 有两个Event的具体实现类: SimpleEvent, JSonEvent.
EventBuilder类顾名思义, 采用Builder的方式来组装对象的成员, 并产生最终的对象.
public class EventBuilder {
public static Event withBody(byte[] body, Map<String, String> headers) {
Event event = new SimpleEvent();
if(body == null) {
body = new byte[0];
}
event.setBody(body);
if (headers != null) {
event.setHeaders(new HashMap<String, String>(headers));
}
return event;
}
public static Event withBody(byte[] body) {
return withBody(body,null);
}
public static Event withBody(String body, Charset charset,
Map<String, String> headers) {
return withBody(body.getBytes(charset), headers);
}
public static Event withBody(String body, Charset charset) {
return withBody(body, charset, null);
}
}
二、在source端配置interceptor,经过interceptor在header上设置变量header值
好比:
使用regex_extractor,对传过来的数据进行处理,提取出type值(若是能够的话,能够在client端的数据格式添加type值,方便使用regex_extractor提取出来)。
三、在source端自定义interceptor,在interceptor里对处理变量header
Interceptor用于过滤Event,即传入一个Event而后进行过滤加工,而后返回一个新的Event,接口以下:
public interface Interceptor {
public void initialize();
public Event intercept(Event event);
public List<Event> intercept(List<Event> events);
public void close();
}
一、public void initialize()运行前的初始化,通常不须要实现(上面的几个都没实现这个方法);
二、public Event intercept(Event event)处理单个event;
三、public List<Event> intercept(List<Event> events)批量处理event,实际上市循环调用上面的2;
四、public void close()能够作一些清理工做,上面几个也都没有实现这个方法;
五、 public interface Builder extends Configurable 构建Interceptor对象,外部使用这个Builder来获取Interceptor对象。
若是要本身定制,必需要完成上面的2,3,5。
下面,咱们来看看org.apache.flume.interceptor.HostInterceptor,其所有代码以下:
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.flume.interceptor;import java.net.InetAddress;import java.net.UnknownHostException;import java.util.List;import java.util.Map;import org.apache.flume.Context;import org.apache.flume.Event;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import static org.apache.flume.interceptor.HostInterceptor.Constants.*;/** * Simple Interceptor class that sets the host name or IP on all events * that are intercepted.<p> * The host header is named <code>host</code> and its format is either the FQDN * or IP of the host on which this interceptor is run. * * * Properties:<p> * * preserveExisting: Whether to preserve an existing value for 'host' * (default is false)<p> * * useIP: Whether to use IP address or fully-qualified hostname for 'host' * header value (default is true)<p> * * hostHeader: Specify the key to be used in the event header map for the * host name. (default is "host") <p> * * Sample config:<p> * * <code> * agent.sources.r1.channels = c1<p> * agent.sources.r1.type = SEQ<p> * agent.sources.r1.interceptors = i1<p> * agent.sources.r1.interceptors.i1.type = host<p> * agent.sources.r1.interceptors.i1.preserveExisting = true<p> * agent.sources.r1.interceptors.i1.useIP = false<p> * agent.sources.r1.interceptors.i1.hostHeader = hostname<p> * </code> * */public class HostInterceptor implements Interceptor { private static final Logger logger = LoggerFactory .getLogger(HostInterceptor.class); private final boolean preserveExisting; private final String header; private String host = null; /** * Only {@link HostInterceptor.Builder} can build me */ private HostInterceptor(boolean preserveExisting, boolean useIP, String header) { this.preserveExisting = preserveExisting; this.header = header; InetAddress addr; try { addr = InetAddress.getLocalHost(); if (useIP) { host = addr.getHostAddress(); } else { host = addr.getCanonicalHostName(); } } catch (UnknownHostException e) { logger.warn("Could not get local host address. Exception follows.", e); } } @Override public void initialize() { // no-op } /** * Modifies events in-place. */ @Override public Event intercept(Event event) { Map<String, String> headers = event.getHeaders(); if (preserveExisting && headers.containsKey(header)) { return event; } if(host != null) { headers.put(header, host); } return event; } /** * Delegates to {@link #intercept(Event)} in a loop. * @param events * @return */ @Override public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; } @Override public void close() { // no-op } /** * Builder which builds new instances of the HostInterceptor. */ public static class Builder implements Interceptor.Builder { private boolean preserveExisting = PRESERVE_DFLT; private boolean useIP = USE_IP_DFLT; private String header = HOST; @Override public Interceptor build() { return new HostInterceptor(preserveExisting, useIP, header); } @Override public void configure(Context context) { preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT); useIP = context.getBoolean(USE_IP, USE_IP_DFLT); header = context.getString(HOST_HEADER, HOST); } } public static class Constants { public static String HOST = "host"; public static String PRESERVE = "preserveExisting"; public static boolean PRESERVE_DFLT = false; public static String USE_IP = "useIP"; public static boolean USE_IP_DFLT = true; public static String HOST_HEADER = "hostHeader"; } }
Constants类是参数类及默认的一些参数:
Builder类是构造HostInterceptor对象的,它会首先经过configure(Context context)方法获取配置文件中interceptor的参数,而后方法build()用来返回一个HostInterceptor对象:
一、preserveExisting表示若是event的header中包含有本interceptor指定的header,是否要保留这个header,true则保留;
二、useIP表示是否使用本机IP地址做为header的value,true则使用IP,默认是true;
三、header是event的headers的key,默认是host。
HostInterceptor:
一、构造函数除了赋值外,还有就是根据useIP获取IP或者hostname;
二、intercept(Event event)方法是设置event的header的地方,首先是获取headers对象,而后若是同时知足preserveExisting==true而且headers.containsKey(header)就直接返回event,不然设置headers:headers.put(header, host)。
三、intercept(List<Event> events)方法是循环调用上述2的方法。
显然其余几个Interceptor也就相似这样。在配置文件中配置source的interceptor时,若是是本身定制的interceptor,则须要对type参数赋值:完整类名+¥Builder,好比com.MyInterceptor$Builder便可。
这样设置好headers后,就能够在后续的流转中经过selector实现细分存储。