前面 FLink 的文章中咱们已经介绍了说 Flink 已经有不少自带的 Connector。html
一、《从0到1学习Flink》—— Data Source 介绍 java
二、《从0到1学习Flink》—— Data Sink 介绍node
其中包括了 Source 和 Sink 的,后面我也讲了下如何自定义本身的 Source 和 Sink。git
那么今天要作的事情是啥呢?就是介绍一下 Flink 自带的 ElasticSearch Connector,咱们今天就用他来作 Sink,将 Kafka 中的数据通过 Flink 处理后而后存储到 ElasticSearch。es6
安装 ElasticSearch,这里就忽略,本身找我之前的文章,建议安装 ElasticSearch 6.0 版本以上的,毕竟要跟上时代的节奏。github
下面就讲解一下生产环境中如何使用 Elasticsearch Sink 以及一些注意点,及其内部实现机制。apache
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
上面这依赖版本号请本身根据使用的版本对应改变下。服务器
下面全部的代码都没有把 import 引入到这里来,若是须要查看更详细的代码,请查看个人 GitHub 仓库地址:并发
https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-connectors/flink-learning-connectors-es6elasticsearch
这个 module 含有本文的全部代码实现,固然越写到后面本身可能会作一些抽象,因此若是有代码改变很正常,请直接查看所有项目代码。
这个工具类是本身封装的,getEsAddresses 方法将传入的配置文件 es 地址解析出来,能够是域名方式,也能够是 ip + port 形式。
addSink 方法是利用了 Flink 自带的 ElasticsearchSink 来封装了一层,传入了一些必要的调优参数和 es 配置参数,下面文章还会再讲些其余的配置。
ElasticSearchSinkUtil.java
public class ElasticSearchSinkUtil { /** * es sink * * @param hosts es hosts * @param bulkFlushMaxActions bulk flush size * @param parallelism 并行数 * @param data 数据 * @param func * @param <T> */ public static <T> void addSink(List<HttpHost> hosts, int bulkFlushMaxActions, int parallelism, SingleOutputStreamOperator<T> data, ElasticsearchSinkFunction<T> func) { ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, func); esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions); data.addSink(esSinkBuilder.build()).setParallelism(parallelism); } /** * 解析配置文件的 es hosts * * @param hosts * @return * @throws MalformedURLException */ public static List<HttpHost> getEsAddresses(String hosts) throws MalformedURLException { String[] hostList = hosts.split(","); List<HttpHost> addresses = new ArrayList<>(); for (String host : hostList) { if (host.startsWith("http")) { URL url = new URL(host); addresses.add(new HttpHost(url.getHost(), url.getPort())); } else { String[] parts = host.split(":", 2); if (parts.length > 1) { addresses.add(new HttpHost(parts[0], Integer.parseInt(parts[1]))); } else { throw new MalformedURLException("invalid elasticsearch hosts format"); } } } return addresses; } }
Main.java
public class Main { public static void main(String[] args) throws Exception { //获取全部参数 final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args); //准备好环境 StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool); //从kafka读取数据 DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env); //从配置文件中读取 es 的地址 List<HttpHost> esAddresses = ElasticSearchSinkUtil.getEsAddresses(parameterTool.get(ELASTICSEARCH_HOSTS)); //从配置文件中读取 bulk flush size,表明一次批处理的数量,这个但是性能调优参数,特别提醒 int bulkSize = parameterTool.getInt(ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS, 40); //从配置文件中读取并行 sink 数,这个也是性能调优参数,特别提醒,这样才可以更快的消费,防止 kafka 数据堆积 int sinkParallelism = parameterTool.getInt(STREAM_SINK_PARALLELISM, 5); //本身再自带的 es sink 上一层封装了下 ElasticSearchSinkUtil.addSink(esAddresses, bulkSize, sinkParallelism, data, (Metrics metric, RuntimeContext runtimeContext, RequestIndexer requestIndexer) -> { requestIndexer.add(Requests.indexRequest() .index(ZHISHENG + "_" + metric.getName()) //es 索引名 .type(ZHISHENG) //es type .source(GsonUtil.toJSONBytes(metric), XContentType.JSON)); }); env.execute("flink learning connectors es6"); } }
配置都支持集群模式填写,注意用 , 分隔!
kafka.brokers=localhost:9092 kafka.group.id=zhisheng-metrics-group-test kafka.zookeeper.connect=localhost:2181 metrics.topic=zhisheng-metrics stream.parallelism=5 stream.checkpoint.interval=1000 stream.checkpoint.enable=false elasticsearch.hosts=localhost:9200 elasticsearch.bulk.flush.max.actions=40 stream.sink.parallelism=5
执行 Main 类的 main 方法,咱们的程序是只打印 flink 的日志,没有打印存入的日志(由于咱们这里没有打日志):
因此看起来不知道咱们的 sink 是否有用,数据是否从 kafka 读取出来后存入到 es 了。
你能够查看下本地起的 es 终端或者服务器的 es 日志就能够看到效果了。
es 日志以下:
上图是我本地 Mac 电脑终端的 es 日志,能够看到咱们的索引了。
若是还不放心,你也能够在你的电脑装个 kibana,而后更加的直观查看下 es 的索引状况(或者直接敲 es 的命令)
咱们用 kibana 查看存入 es 的索引以下:
程序执行了一会,存入 es 的数据量就很大了。
上面代码已经能够实现你的大部分场景了,可是若是你的业务场景须要保证数据的完整性(不能出现丢数据的状况),那么就须要添加一些重试策略,由于在咱们的生产环境中,颇有可能会由于某些组件不稳定性致使各类问题,因此这里咱们就要在数据存入失败的时候作重试操做,这里 flink 自带的 es sink 就支持了,经常使用的失败重试配置有:
一、bulk.flush.backoff.enable 用来表示是否开启重试机制 二、bulk.flush.backoff.type 重试策略,有两种:EXPONENTIAL 指数型(表示屡次重试之间的时间间隔按照指数方式进行增加)、CONSTANT 常数型(表示屡次重试之间的时间间隔为固定常数) 三、bulk.flush.backoff.delay 进行重试的时间间隔 四、bulk.flush.backoff.retries 失败重试的次数 五、bulk.flush.max.actions: 批量写入时的最大写入条数 六、bulk.flush.max.size.mb: 批量写入时的最大数据量 七、bulk.flush.interval.ms: 批量写入的时间间隔,配置后则会按照该时间间隔严格执行,无视上面的两个批量写入配置
看下啦,就是以下这些配置了,若是你须要的话,能够在这个地方配置扩充了。
写入 ES 的时候会有这些状况会致使写入 ES 失败:
一、ES 集群队列满了,报以下错误
12:08:07.326 [I/O dispatcher 13] ERROR o.a.f.s.c.e.ElasticsearchSinkBase - Failed Elasticsearch item request: ElasticsearchException[Elasticsearch exception [type=es_rejected_execution_exception, reason=rejected execution of org.elasticsearch.transport.TransportService$7@566c9379 on EsThreadPoolExecutor[name = node-1/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@f00b373[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 6277]]]]
是这样的,我电脑安装的 es 队列容量默认应该是 200,我没有修改过。我这里若是配置的 bulk flush size * 并发 sink 数量 这个值若是大于这个 queue capacity ,那么就很容易致使出现这种由于 es 队列满了而写入失败。
固然这里你也能够经过调大点 es 的队列。参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html
二、ES 集群某个节点挂了
这个就不用说了,确定写入失败的。跟过源码能够发现 RestClient 类里的 performRequestAsync 方法一开始会随机的从集群中的某个节点进行写入数据,若是这台机器掉线,会进行重试在其余的机器上写入,那么当时写入的这台机器的请求就须要进行失败重试,不然就会把数据丢失!
三、ES 集群某个节点的磁盘满了
这里说的磁盘满了,并非磁盘真的就没有一点剩余空间的,是 es 会在写入的时候检查磁盘的使用状况,在 85% 的时候会打印日志警告。
这里我看了下源码以下图:
若是你想继续让 es 写入的话就须要去从新配一下 es 让它继续写入,或者你也能够清空些没必要要的数据腾出磁盘空间来。
DataStream<String> input = ...; input.addSink(new ElasticsearchSink<>( config, transportAddresses, new ElasticsearchSinkFunction<String>() {...}, new ActionRequestFailureHandler() { @Override void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throw Throwable { if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { // full queue; re-add document for indexing indexer.add(action); } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) { // malformed document; simply drop request without failing sink } else { // for all other failures, fail the sink // here the failure is simply rethrown, but users can also choose to throw custom exceptions throw failure; } } }));
若是仅仅只是想作失败重试,也能够直接使用官方提供的默认的 RetryRejectedExecutionFailureHandler ,该处理器会对 EsRejectedExecutionException 致使到失败写入作重试处理。若是你没有设置失败处理器(failure handler),那么就会使用默认的 NoOpFailureHandler 来简单处理全部的异常。
本文写了 Flink connector es,将 Kafka 中的数据读取并存储到 ElasticSearch 中,文中讲了如何封装自带的 sink,而后一些扩展配置以及 FailureHandler 状况下要怎么处理。(这个问题但是线上很容易遇到的)
原创地址为:http://www.54tianzhisheng.cn/2018/12/30/Flink-ElasticSearch-Sink/