最近工做中有这样一个ElasticSearch(如下简称ES)写入的场景,Flink处理完数据实时写入ES。如今须要将一批历史数据经过Flink加载到到ES,有两个点须要保证:git
参考ElasticSearch进阶篇(一)--版本控制,能够使用ES的版本实现该需求的开发。shell
请求写数据时加入version和version_type参数,主要代码以下:segmentfault
IndexRequest indexRequest = Requests.indexRequest() .index(indexName) .id("1") // 指定版本比较的业务字段,具体业务具体分析,通常取时间戳较为合适 .version(Long.parseLong(dataMap1.get("create").toString())) // 指定使用外部版本号 .versionType(VersionType.EXTERNAL) .source(dataMap);
验证demo可以使用当前时间的时间戳做为版本比较依据。验证思路以下:elasticsearch
验证结果以下图:工具
由截图可看到,第一步和第二步都能执行成功,第三步执行会出现版本冲突的异常,根据提示很方便能识别出缘由,即ElasticSearch进阶篇(一)--版本控制中得出的结论,使用version和version_type=EXTERNAL进行版本控制时,只有要写入文档的版本号大于已有文档的版本号才能更新成功。url
案例代码参考:elasticsearch_demospa