ES18-JAVA API 批量操做

1.批量查询

Multi Get API json

public static void multiGet() {
		// 批量查询
		MultiGetResponse response = getClient().prepareMultiGet()
				.add("my_person", "my_index", "1")// 查询id为1的文档
				.add("my_person", "my_index", "2", "3", "4")// ids,id列表
				.add("telegraph", "msg", "2")// 能够查询其余索引里面的数据
				.get();
		// 获取相应结果
		for (MultiGetItemResponse multiGetItemResponse : response) { // 遍历结果集
			GetResponse getResponse = multiGetItemResponse.getResponse();
			if (getResponse.isExists()) {// 判断文档是否存在
				String json = getResponse.getSourceAsString();
				System.out.println(json);
			}
		}

	}

测试并发

public static void main(String[] args) {
		multiGet();
	}

执行结果测试

{
  "name":"sean",
  "age":22,
  "salary":6000
}

{
  "name":"sim",
  "age":20,
  "salary":5000
}

{
  "name":"duck",
  "age":28,
  "salary":8000
}

{
  "name":"lily",
  "age":20,
  "salary":4000
}

{"title":"被更新以后title","content":"测试添加内容"}

2.批量操做

Bulk APIui

/**
	 * 批量操做
	 * @throws Exception
	 */
	public static void bulk() throws Exception {
		BulkRequestBuilder bulkRequest = getClient().prepareBulk();

		bulkRequest.add(getClient().prepareIndex("telegraph", "msg", "3")
				.setSource(XContentFactory.jsonBuilder().startObject().field("title", "控股股东涉嫌内幕交易 被证监会立案调查")
						.field("content", "财联社7月23日讯,嘉欣丝绸晚间公告,控股股东、董事长周国建因其涉嫌内幕交易,收到中国证监会的《调查通知书》,对其进行立案调查")
						.endObject()));
		bulkRequest.add(getClient().prepareIndex("telegraph", "msg", "4")
				.setSource(XContentFactory.jsonBuilder().startObject().field("title", "泛海控股股价13连阳 控股股东今日再增持213万股")
						.field("content",
								"财联社7月23日讯,泛海控股晚间公告,控股股东中国泛海于7月23日增持了213.16万股公司股份,约占公司股份总数的0.0410%,成交均价为6.798 元/股")
						.endObject()));
		// 批量执行
		BulkResponse bulkResponse = bulkRequest.get();
		System.out.println(bulkResponse.status());
		// 判断是否存在失败操做
		if (bulkResponse.hasFailures()) {
			System.out.println("存在失败操做");
		}
		//遍历每一个操做的执行结果
		for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
			System.out.println(bulkItemResponse.getResponse().toString());
		}
	}

测试操做spa

public static void main(String[] args) {
		try {
			bulk();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

3.批量处理器(Bulk Processor)

BulkProcessor类提供了一个简单接口,能够根据请求的数量或大小自动刷新批量操做,也能够在给定的时间段以后自动刷新批量操做。线程

/**
	 * 批量处理器
	 */
	public static void bulkProcessor() {

		BulkProcessor.Listener listener = new BulkProcessor.Listener() {

			public void beforeBulk(long executionId, BulkRequest request) {
				// 执行批量操做以前
				System.out.println(request.numberOfActions());
			}

			public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
				// 执行批量操做以后,异常
				System.out.println("执行错误:" + request.toString() + ",失败:" + failure.getMessage());
			}

			public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
				// 执行批量操做以后
				for (BulkItemResponse bulkItemResponse : response.getItems()) {
					System.out.println("执行成功"+bulkItemResponse.getResponse().toString());
				}
			}
		};

		// 设置执行器,包含执行时执行过程的监听,以及执行属性配置
		BulkProcessor bulkProcessor = BulkProcessor.builder(getClient(), listener).setBulkActions(500) // 设置批量处理数量的阀值
				.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))// 设置批量执执行处理请求大小阀值
				.setFlushInterval(TimeValue.timeValueSeconds(5))// 设置刷新索引时间间隔
				.setConcurrentRequests(1)// 设置并发处理线程个数
				.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100), 3))// 设置回滚策略,等待时间100,重试次数3
				.build();

		// 添加须要执行的请求
		bulkProcessor.add(new DeleteRequest("telegraph", "msg", "3"));
		bulkProcessor.add(new DeleteRequest("telegraph", "msg", "4"));
		// 刷新请求
		bulkProcessor.flush();
		// 关闭执行器
		bulkProcessor.close();
		//刷新索引(没有这一步不执行)
		getClient().admin().indices().prepareRefresh().get();
	}

测试code

public static void main(String[] args) {
		try {
			bulkProcessor();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

4.查询删除

根据查询条件,删除知足条件的文档索引

/**
	 * 根据查询条件删除文档
	 */
	public static void deleteQuery() {

		//根据查询条件删除文档
		BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(getClient())
				.filter(QueryBuilders.matchQuery("title", "长生生物")).source("telegraph").get();

		System.out.println(response.getDeleted());// 删除文档数量

	}

测试接口

public static void main(String[] args) {
		try {
			deleteQuery();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
相关文章
相关标签/搜索