Multi Get API json
public static void multiGet() { // 批量查询 MultiGetResponse response = getClient().prepareMultiGet() .add("my_person", "my_index", "1")// 查询id为1的文档 .add("my_person", "my_index", "2", "3", "4")// ids,id列表 .add("telegraph", "msg", "2")// 能够查询其余索引里面的数据 .get(); // 获取相应结果 for (MultiGetItemResponse multiGetItemResponse : response) { // 遍历结果集 GetResponse getResponse = multiGetItemResponse.getResponse(); if (getResponse.isExists()) {// 判断文档是否存在 String json = getResponse.getSourceAsString(); System.out.println(json); } } }
测试并发
public static void main(String[] args) { multiGet(); }
执行结果测试
{ "name":"sean", "age":22, "salary":6000 } { "name":"sim", "age":20, "salary":5000 } { "name":"duck", "age":28, "salary":8000 } { "name":"lily", "age":20, "salary":4000 } {"title":"被更新以后title","content":"测试添加内容"}
Bulk APIui
/** * 批量操做 * @throws Exception */ public static void bulk() throws Exception { BulkRequestBuilder bulkRequest = getClient().prepareBulk(); bulkRequest.add(getClient().prepareIndex("telegraph", "msg", "3") .setSource(XContentFactory.jsonBuilder().startObject().field("title", "控股股东涉嫌内幕交易 被证监会立案调查") .field("content", "财联社7月23日讯,嘉欣丝绸晚间公告,控股股东、董事长周国建因其涉嫌内幕交易,收到中国证监会的《调查通知书》,对其进行立案调查") .endObject())); bulkRequest.add(getClient().prepareIndex("telegraph", "msg", "4") .setSource(XContentFactory.jsonBuilder().startObject().field("title", "泛海控股股价13连阳 控股股东今日再增持213万股") .field("content", "财联社7月23日讯,泛海控股晚间公告,控股股东中国泛海于7月23日增持了213.16万股公司股份,约占公司股份总数的0.0410%,成交均价为6.798 元/股") .endObject())); // 批量执行 BulkResponse bulkResponse = bulkRequest.get(); System.out.println(bulkResponse.status()); // 判断是否存在失败操做 if (bulkResponse.hasFailures()) { System.out.println("存在失败操做"); } //遍历每一个操做的执行结果 for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { System.out.println(bulkItemResponse.getResponse().toString()); } }
测试操做spa
public static void main(String[] args) { try { bulk(); } catch (Exception e) { e.printStackTrace(); } }
BulkProcessor类提供了一个简单接口,能够根据请求的数量或大小自动刷新批量操做,也能够在给定的时间段以后自动刷新批量操做。线程
/** * 批量处理器 */ public static void bulkProcessor() { BulkProcessor.Listener listener = new BulkProcessor.Listener() { public void beforeBulk(long executionId, BulkRequest request) { // 执行批量操做以前 System.out.println(request.numberOfActions()); } public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // 执行批量操做以后,异常 System.out.println("执行错误:" + request.toString() + ",失败:" + failure.getMessage()); } public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { // 执行批量操做以后 for (BulkItemResponse bulkItemResponse : response.getItems()) { System.out.println("执行成功"+bulkItemResponse.getResponse().toString()); } } }; // 设置执行器,包含执行时执行过程的监听,以及执行属性配置 BulkProcessor bulkProcessor = BulkProcessor.builder(getClient(), listener).setBulkActions(500) // 设置批量处理数量的阀值 .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))// 设置批量执执行处理请求大小阀值 .setFlushInterval(TimeValue.timeValueSeconds(5))// 设置刷新索引时间间隔 .setConcurrentRequests(1)// 设置并发处理线程个数 .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100), 3))// 设置回滚策略,等待时间100,重试次数3 .build(); // 添加须要执行的请求 bulkProcessor.add(new DeleteRequest("telegraph", "msg", "3")); bulkProcessor.add(new DeleteRequest("telegraph", "msg", "4")); // 刷新请求 bulkProcessor.flush(); // 关闭执行器 bulkProcessor.close(); //刷新索引(没有这一步不执行) getClient().admin().indices().prepareRefresh().get(); }
测试code
public static void main(String[] args) { try { bulkProcessor(); } catch (Exception e) { e.printStackTrace(); } }
根据查询条件,删除知足条件的文档索引
/** * 根据查询条件删除文档 */ public static void deleteQuery() { //根据查询条件删除文档 BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(getClient()) .filter(QueryBuilders.matchQuery("title", "长生生物")).source("telegraph").get(); System.out.println(response.getDeleted());// 删除文档数量 }
测试接口
public static void main(String[] args) { try { deleteQuery(); } catch (Exception e) { e.printStackTrace(); } }