Flume Sink的目的是从Flume Channel中获取数据而后输出到存储或者其余Flume Source中。Flume Agent启动的时候,它会为每个Sink都启动一个SinkRunner的对象,SinkRunner.start()方法会启动一个新的线程去管理每个Sink的生命周期。每个Sink须要实现start()、Stop()和process()方法。你能够在start方法中去初始化Sink的参数和状态,在stop方法中清理Sink的资源。最关键的是process方法,它将处理从Channel中拿出来的数据。另外若是Sink有一些配置则须要实现Configurable接口。apache
因为Flume官方提供的Sink每每不能知足要求,因此咱们自定义Sink来实现定制化的需求,这里以ElasticSearch为例。在Sink中实现因此文档的简单的Insert功能。例子使用Flume 1.7。json
1. 编写代码bootstrap
首先新建类ElasticSearchSink类继承AbstractSink类,因为还但愿有自定义的Sink的配置,因此实现Configurable接口。架构
public class ElasticSearchSink extends AbstractSink implements Configurable
ElasticSearch的IP以及索引的名称能够配置在配置文件里面,配置文件就是使用flume的conf文件。你能够重写Configurable的configure的方法去获取配置,代码以下:elasticsearch
@Override public void configure(Context context) { esHost = context.getString("es_host"); esIndex = context.getString("es_index"); }
注意里面的配置项“es_host”和“es_index”在conf配置文件中的语法:ide
agent.sinks = sink1
agent.sinks.sink1.type = nick.test.flume.ElasticSearchSink
agent.sinks.sink1.es_host = 192.168.50.213
agent.sinks.sink1.es_index = vehicle_event_test
接下来就是实现process方法,在这个方法中须要获取channel,由于数据都是从channel中得到的。获取消息以前,须要先获取一个Channel是事务,处理完成以后须要commit和关闭这个事务。这样才能让channel知道这个消息已经消费完成,它能够从它的内部队列中删除这个消息。若是消费失败,须要从新消费的话,能够rollback这个事务。事务的引入是flume对消息可靠性保证的关键。ui
process方法须要返回一个Status类型的枚举,Ready和BackOff。若是你到了一个消息,并正常处理了,须要使用Ready。若是拿到的消息是null,则能够返回BackOff。所谓BackOff(失效补偿)就是当sink获取不到 消息的时候, Sink的PollingRunner 线程须要等待一段backoff时间,等channel中的数据获得了补偿再来进行pollling 操做。spa
完整的代码以下:线程
public class ElasticSearchSink extends AbstractSink implements Configurable { private String esHost; private String esIndex; private TransportClient client; @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { Event event = ch.take(); if (event != null) { String body = new String(event.getBody(), "UTF-8"); BulkRequestBuilder bulkRequest = client.prepareBulk(); List<JSONObject> jsons = new ArrayList<JSONObject>(); JSONObject obj = JSONObject.parseObject(body); String vehicleId = obj.getString("vehicle_id"); String eventBeginCode = obj.getString("event_begin_code"); String eventBeginTime = obj.getString("event_begin_time"); //doc id in index String id = (vehicleId + "_" + eventBeginTime + "_" + eventBeginCode).trim(); JSONObject json = new JSONObject(); json.put("vehicle_id", vehicleId); bulkRequest.add(client.prepareIndex(esIndex, esIndex).setSource(json)); BulkResponse bulkResponse = bulkRequest.get(); status = Status.READY; } else { status = Status.BACKOFF; } txn.commit(); } catch (Throwable t) { txn.rollback(); t.getCause().printStackTrace(); status = Status.BACKOFF; } finally { txn.close(); } return status; } @Override public void configure(Context context) { esHost = context.getString("es_host"); esIndex = context.getString("es_index"); } @Override public synchronized void stop() { super.stop(); } @Override public synchronized void start() { try { Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build(); client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHost), 9300)); super.start(); System.out.println("finish start"); } catch (Exception ex) { ex.printStackTrace(); } } }
2. 打包、配置和运行code
因为是自定义的Sink,因此须要打成jar包,而后copy到flume的lib文件夹下。而后配置agent的配置文件,最后启动flume就能够了。本例中,我使用了kafkasource、memorychannel和自定义的sink,完整的配置文件以下:
agent.sources = source1
agent.channels = channel1
agent.sinks = sink1
agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.source1.channels = channel1
agent.sources.source1.batchSize = 1
agent.sources.source1.batchDurationMillis = 2000
agent.sources.source1.kafka.bootstrap.servers = 192.168.50.116:9092,192.168.50.117:9092,192.168.50.118:9092,192.168.50.226:9092
agent.sources.source1.kafka.topics = iov-vehicle-event
agent.sources.source1.kafka.consumer.group.id = flume-vehicle-event-nick
agent.sinks.sink1.type = nick.test.flume.ElasticSearchSink
agent.sinks.sink1.es_host = 192.168.50.213
agent.sinks.sink1.es_index = vehicle_event_test
agent.sinks.sink1.channel = channel1
agent.channels.channel1.type = memory
agent.channels.channel1.capacity = 1000