ElasticSearch-Java-low-level-rest-client官方文档翻译

       人肉翻译,非谷歌机翻,部分地方添加了我的的理解,并作了分割,若有错误请在评论指出。转载请指明原连接,尊重我的劳动成果。node

       High-Level-Rest-Client基于Low-Level-Rest-Client封装,Client配置方面基于Low-Level,而API使用上基于High-Level。apache

       翻译的版本为6.5.4版本的Elasticsearch,部分不重要的内容(如Maven/Gradle坐标、License等不在本文出现)。编程

       在实际的配置过程当中,查看源码发现LowLevelClient彻底基于HttpAsyncClient来实现的,内部具体组装Client的细节,若是有时间会写另一篇博文分享出来。json

 
低级别rest-client包含了以下特性:
  • 最小的依赖
  • 在集群全部结点间负载均衡
  • 结点失败后,根据具体的响应码(response code)进行失效转移
  • 链接失败惩罚机制(是否重连一个失败的结点取决于它失败的次数;尝试从新链接失败的次数越多,客户端下一次尝试从新链接该结点的等待时间也越长)
  • 持久化链接
  • 请求和响应的日志追踪
  • (可选功能)集群结点自动发现
 

一、初始化客户端

1.1.基本设置

        RestClient实例只须要经过RestClientBuilder类进行构建就行,以下所示,能够提供多个链接实例的HttpHost(每个HttpHost都是一个结点Node):

1 RestClient restClient = RestClient.builder(
2     new HttpHost("localhost", 9200, "http"),
3     new HttpHost("localhost", 9201, "http")).build();
        RestClient是线程安全的,而且和它依附的应用有相同的生命周期。须要注意的是,当不须要Client的时候,须要关闭它以便正确的释放其持有的资源(这些资源有底层的http client实例与它的线程):
1 restClient.close();

 

 1.2.参数化设置

   RestClientBuilder一样容许参数化的设置以下参数来用于构建RestClient实例:
  • 设置默认的请求头,该请求头会应用到每一个request上去。不容许单独的在每次请求时进行请求头设置。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
3 builder.setDefaultHeaders(defaultHeaders);
  • 设置同一个请求的屡次尝试时的最大超时时间,这个设置默认的时间是30秒(同socket timeout的默认超时时间同样)。若是你修改了socket timeout,你也应该修改这个值来适应socket timeout。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setMaxRetryTimeoutMillis(10000); 
  • 设置监听器,当一个结点故障时收到通知,以采起措施。嗅探失败节点功能会隐式的使用一个这样的监听器。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setFailureListener(new RestClient.FailureListener() {
3     @Override
4     public void onFailure(Node node) {
5         
6     }
7 });
  • 设置结点选择器,用于过滤某些es的结点(这些结点在初始化时设置到Client中)。当结点嗅探开启时,若是你不想将请求发送至dedicated master结点上,该功能很好用。Client默认会将请求发送到全部配置的节点上。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
  • 经过一个回调函数修改默认的请求配置(如超时时间、验证信息等其余apache httpclient包中RequestConfig.Builder容许的配置)。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setRequestConfigCallback(
3     new RestClientBuilder.RequestConfigCallback() {
4         @Override
5         public RequestConfig.Builder customizeRequestConfig(
6                 RequestConfig.Builder requestConfigBuilder) {
7             return requestConfigBuilder.setSocketTimeout(10000); 
8         }
9     });
  • 经过一个回调函数修改默认的HttpClient配置(如ssl配置等其余apache httpclient包中HttpAsyncClientBuilder容许的配置)。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
3         @Override
4         public HttpAsyncClientBuilder customizeHttpClient(
5                 HttpAsyncClientBuilder httpClientBuilder) {
6             return httpClientBuilder.setProxy(
7                 new HttpHost("proxy", 9000, "http"));  
8         }
9     });

 

二、执行请求

2.1.同步与异步API

        一旦Client建立,就能够经过performRequest或performRequestAsync来执行请求:
(1)performRequest是同步请求,它会阻塞调用线程,在请求成功或者请求失败抛出异常以后,返回Response;
1 Request request = new Request(
2     "GET",   //HTTP方法,支持GET、POST、HEAD等
3     "/");    //es服务端点endpoint
4 Response response = restClient.performRequest(request);
(2)performRequestAsync是异步请求,它接收一个ResponseListener参数,当请求成功或者请求失败抛出异常以后,该Listener会被调起并传入Response或Exception到相应方法;
 1 Request request = new Request(
 2     "GET",   //HTTP方法,支持GET、POST、HEAD等
 3     "/");    //es服务端点endpoint
 4 restClient.performRequestAsync(request, new ResponseListener() {
 5     @Override
 6     public void onSuccess(Response response) {
 7         //成功回调
 8     }
 9 
10     @Override
11     public void onFailure(Exception exception) {
12         //失败回调
13     }
14 });
        另外,你能够向Request对象中添加请求参数,以下:
1 request.addParameter("pretty", "true");

        若是将HttpEntity设置为String,那么ContentType会被默认设置为application/json:api

2.2.请求可选项

        RequestOptions这个类用于设置请求可选项,咱们建议在同一个应用中,对于使用一样的请求可选项的Request共享一个RequestOption实例,你能够经过单例来实现:
1 private static final RequestOptions COMMON_OPTIONS;
2 static {
3     RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
4     builder.addHeader("Authorization", "Bearer " + TOKEN);  //设置请求头的校验信息
5     builder.setHttpAsyncResponseConsumerFactory( //设置Response的缓冲池大小          
6         new HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024)); 
7     COMMON_OPTIONS = builder.build();
8 }
  • 这里有一个addHeader方法,须要注意的是,在这里不须要设置ContentType,由于在设置HttpEntity时就会自动关联ContentType;
  • 你能够在这里设置NodeSelector。NodeSelector.NOT_MASTER_ONLY是一个比较好的选择;
  • 你能够在这里定制用于缓冲异步响应的Response Consumer。默认的ResponseConsumer会在JVM堆上缓存100M的响应数据。若是响应数据比缓冲区过大,当次请求会失败。若是你的应用跑在一个JVM堆比较小的环境中,你须要下降缓冲区的最大值。
        配置好RequestOptions以后,咱们在每个共享该配置的Request中,简单设置一下该对象便可:
1 request.setOptions(COMMON_OPTIONS);

        若是有某些请求须要对OPTIONS作必定程度的变动,能够经过以下方式增量处理而不是从新写一遍设置代码:数组

1 RequestOptions.Builder options = COMMON_OPTIONS.toBuilder();
2 options.addHeader("cats", "knock things off of other things");
3 request.setOptions(options); 

2.3.批量并行异步请求

        下面是一个使用闭锁并行执行异步请求的例子。在实际的编程中,你通常会倾向于使用_bulk这个API来替代,固然了,下面的例子依然具备参考性:缓存

 1 final CountDownLatch latch = new CountDownLatch(documents.length);
 2 for (int i = 0; i < documents.length; i++) {
 3     Request request = new Request("PUT", "/posts/doc/" + i);
 4     //let's assume that the documents are stored in an HttpEntity array
 5     request.setEntity(documents[i]);
 6     restClient.performRequestAsync(
 7             request,
 8             new ResponseListener() {
 9                 @Override
10                 public void onSuccess(Response response) {
11                     
12                     latch.countDown();
13                 }
14 
15                 @Override
16                 public void onFailure(Exception exception) {
17                     
18                     latch.countDown();
19                 }
20             }
21     );
22 }
23 latch.await();

 

三、读取响应

3.1.获取响应对象

        响应对象,不管是经过同步仍是异步获取到的(同步经过API返回值获取,异步经过回调方法中的参数获取),都是同一个类的对象。该对象包装了http client原始的响应对象以及一些附加信息:
1 Response response = restClient.performRequest(new Request("GET", "/"));
2 RequestLine requestLine = response.getRequestLine(); 
3 HttpHost host = response.getHost(); 
4 int statusCode = response.getStatusLine().getStatusCode(); 
5 Header[] headers = response.getHeaders(); 
6 String responseBody = EntityUtils.toString(response.getEntity()); 
          若是执行请求失败的话,你将会获得一个异常,异常有如下几种:
(1)IOException:通信问题,例如SocketTimeoutException;
(2)ResponseException:响应已经返回,可是其状态码不为2XX时将抛出这个异常。ResponseException源自于一个有效的http Response,所以该异常会将源Response暴露出来,并容许你获取它。
        注意:HEAD请求若是返回了404,ResponseException是不会被抛出的,由于对于HEAD来讲资源未找到也是该方法指望的返回值之一。全部其余的HTTP方法对于404返回码都会抛出一个ResponseException,除非在ignore参数中包含了404。ignore是一个特殊的参数,该参数不会发送给es,它包含了一个分割的error status code列表。当Response的状态码在这个列表中时,它会被当成Response而不是做为Exception。对于GET方法相关的API,这是一个至关有用的特性,好比GET查询一个文档返回404,咱们指望其不返回一个异常,而是一个没有document的response对象。
        注意:low-level-client并不会给出json相关的编组与解组API。你能够自由选择相关的库来进行json的编组与解组。
        底层的Apache Async Http Client库附带了不一样的org.apache.http.HttpEntity实现,这些实现容许以不一样的格式(流,字节数组,字符串等)提供请求主体。至于读取响应体,HttpEntity#getContent方法很方便,它返回从先前缓冲的响应体读取的InputStream。做为替代方案,能够提供一个自定义的org.apache.http.nio.protocol.HttpAsyncResponseConsumer来控制字节的读取和缓冲方式。
 

四、通用配置

        在初始化客户端中,咱们看到了RestClientBuilder支持提供一个RequestConfigCallback与一个可以用于个性化异步请求的HttpClientConfigCallback。这些回调函数可以让你在不覆盖每个RestClient初始化默认值的状况下修改某些指定行为。本小节咱们描述一些须要为low-level-client添加额外配置的公共场景。

4.1.Timeouts

        在初始化客户端的参数化设置就提过了,能够经过以下形式进行设置:
 1 RestClientBuilder builder = RestClient.builder(
 2     new HttpHost("localhost", 9200))
 3     .setRequestConfigCallback(
 4         new RestClientBuilder.RequestConfigCallback() {
 5             @Override
 6             public RequestConfig.Builder customizeRequestConfig(
 7                     RequestConfig.Builder requestConfigBuilder) {
 8                 return requestConfigBuilder
 9                     .setConnectTimeout(5000) //默认值1S
10                     .setSocketTimeout(60000); //默认值30S
11             }
12         })
13     .setMaxRetryTimeoutMillis(60000); //设置完SocketTimeout后,通常须要联动修改这个值

4.2.线程数

        ApacheHttpAsyncClient基于IOReactor模式,在启动时会建立一个分发线程和一组工做线程,它们由Connection Manager使用,工做线程的默认数量与可以检测到的CPU核心数一致(Runtime.getRuntime().availableProcessors()的返回结果)。能够经过以下方法修改工做线程数:
 1 RestClientBuilder builder = RestClient.builder(
 2     new HttpHost("localhost", 9200))
 3     .setHttpClientConfigCallback(new HttpClientConfigCallback() {
 4         @Override
 5         public HttpAsyncClientBuilder customizeHttpClient(
 6                 HttpAsyncClientBuilder httpClientBuilder) {
 7             return httpClientBuilder.setDefaultIOReactorConfig(
 8                 IOReactorConfig.custom()
 9                     .setIoThreadCount(1)
10                     .build());
11         }
12     }); 

4.3.身份验证

        使用以下的方式设置身份验证相关的请求参数:
 1 final CredentialsProvider credentialsProvider =
 2     new BasicCredentialsProvider();
 3 credentialsProvider.setCredentials(AuthScope.ANY,
 4     new UsernamePasswordCredentials("user", "password"));
 5 
 6 RestClientBuilder builder = RestClient.builder(
 7     new HttpHost("localhost", 9200))
 8     .setHttpClientConfigCallback(new HttpClientConfigCallback() {
 9         @Override
10         public HttpAsyncClientBuilder customizeHttpClient(
11                 HttpAsyncClientBuilder httpClientBuilder) {
12             return httpClientBuilder
13                 .setDefaultCredentialsProvider(credentialsProvider);
14         }
15     });
         抢占式(这里翻译不太肯定,结合上下文,应该指的是在第一次请求就发送身份验证信息)身份验证能够被禁用,这意味着每一个HTTP请求都会在没有身份验证请求头的状况下发送,以查看它是否被接受,而且在收到HTTP 401响应后,它将使用含有身份验证请求头的报文从新发送彻底相同的请求。 若是您但愿这样作,那么您能够经过HttpAsyncClientBuilder禁用它:
 1 final CredentialsProvider credentialsProvider =
 2     new BasicCredentialsProvider();
 3 credentialsProvider.setCredentials(AuthScope.ANY,
 4     new UsernamePasswordCredentials("user", "password"));
 5 
 6 RestClientBuilder builder = RestClient.builder(
 7     new HttpHost("localhost", 9200))
 8     .setHttpClientConfigCallback(new HttpClientConfigCallback() {
 9         @Override
10         public HttpAsyncClientBuilder customizeHttpClient(
11                 HttpAsyncClientBuilder httpClientBuilder) {
12             httpClientBuilder.disableAuthCaching();  /* 禁用抢占式身份验证 */
13             return httpClientBuilder
14                 .setDefaultCredentialsProvider(credentialsProvider);
15         }
16     });

4.4.SSL通讯

        加密通讯一样能够经过HttpClientConfigCallback进行配置。HttpAsyncClientBuilder经过3个暴露的API:setSSLContext、setSSLSessionStrategy、setConnectionManager进行加密通讯的设置。下面是一个配置示例(若是没有显式配置,则会使用系统的默认配置):
 1 KeyStore truststore = KeyStore.getInstance("jks");
 2 try (InputStream is = Files.newInputStream(keyStorePath)) {
 3     truststore.load(is, keyStorePass.toCharArray());
 4 }
 5 SSLContextBuilder sslBuilder = SSLContexts.custom()
 6     .loadTrustMaterial(truststore, null);
 7 final SSLContext sslContext = sslBuilder.build();
 8 RestClientBuilder builder = RestClient.builder(
 9     new HttpHost("localhost", 9200, "https"))
10     .setHttpClientConfigCallback(new HttpClientConfigCallback() {
11         @Override
12         public HttpAsyncClientBuilder customizeHttpClient(
13                 HttpAsyncClientBuilder httpClientBuilder) {
14             return httpClientBuilder.setSSLContext(sslContext);
15         }
16     });

4.5.节点选择器

        客户端默认状况下会以轮询的形式将请求发送到每个初始化时配置的节点上。在初始化时,咱们能够提供一个节点选择器NodeSelector。当嗅探开启时这个功能会很好用,以防只有HTTP请求才能访问专用主节点。 对于每一个请求,客户端将运行最终配置的节点选择器以过滤候选节点,而后从列表中选择下一个节点选择器。下面是一个基于机架进行节点选择的例子:
 1 RestClientBuilder builder = RestClient.builder(
 2         new HttpHost("localhost", 9200, "http"));
 3 builder.setNodeSelector(new NodeSelector() { 
 4     @Override
 5     public void select(Iterable<Node> nodes) {
 6         /*
 7          * Prefer any node that belongs to rack_one. If none is around
 8          * we will go to another rack till it's time to try and revive
 9          * some of the nodes that belong to rack_one.
10          */
11         boolean foundOne = false;
12         for (Node node : nodes) {
13             String rackId = node.getAttributes().get("rack_id").get(0);
14             if ("rack_one".equals(rackId)) {
15                 foundOne = true;
16                 break;
17             }
18         }
19         if (foundOne) {
20             Iterator<Node> nodesIt = nodes.iterator();
21             while (nodesIt.hasNext()) {
22                 Node node = nodesIt.next();
23                 String rackId = node.getAttributes().get("rack_id").get(0);
24                 if ("rack_one".equals(rackId) == false) {
25                     nodesIt.remove();
26                 }
27             }
28         }
29     }
30 });
         上面的例子中,优先选择一号机架上的节点。当一号机架没有任何节点时,才会选择其余机架节点。
       须要注意的是,若是一个NodeSelector没法提供一个始终如一的(或者说稳定的)节点列表,那么轮询发送请求至不一样node的行为将会变得不可预测或失效。在上面的例子中,轮询行为会很好的工做,由于这个例子中的代码能够很好的(或比较稳定的)获取到哪些结点可用(这些获取到的可用结点可以影响轮询行为的可预测性)。所以,这也提醒咱们NodeSelector不该该依赖于任何外部因素,不然轮询行为将会变的不可预测。
 

五、嗅探Sniffer

        容许从一个运行的ES集群中自动发现结点,而且将发现的结点设置入一个已存在的RestClient。默认的,它使用节点信息api检索属于集群的节点,并使用jackson解析得到的json响应。
        要使用该功能,必须引入依赖,该依赖版本同rest-client版本同样,跟着es的发布版本走的:
1 <dependency>
2     <groupId>org.elasticsearch.client</groupId>
3     <artifactId>elasticsearch-rest-client-sniffer</artifactId>
4     <version>6.5.4</version>
5 </dependency>

5.1.基本设置

        在初始化的RestClient的时候将一个Sniffer与其进行关联。Sniffer嗅探器会为RestClient提供一个周期性的拉取es全部结点列表的功能(默认5分钟拉取一次),拉取到节点列表后,会调用RestClient.setNodes的API将其更新入RestClient。
1 RestClient restClient = RestClient.builder(
2     new HttpHost("localhost", 9200, "http"))
3     .build();
4 Sniffer sniffer = Sniffer.builder(restClient).build(); 
        Sniffer应该具备同RestClient同样的生命周期,也就是说,在关闭RestClient时,也须要关闭Sniffer以释放其占用的资源:
1 sniffer.close();
2 restClient.close(); 

5.2.定制刷新间隔

        上面咱们提到了,Sniffer默认5分钟刷新一次,咱们能够在初始化Sniffer的时候改变这个时间(单为:毫秒):
1 RestClient restClient = RestClient.builder(
2     new HttpHost("localhost", 9200, "http"))
3     .build();
4 Sniffer sniffer = Sniffer.builder(restClient)
5     .setSniffIntervalMillis(60000).build(); //设置Sniffer刷新间隔
        另外,咱们能够在请求失败上启用Sniffer,即在每次请求节点失败后,可让Sniffer执行一次节点更新(区别于Sniffer固定间隔的刷新)。咱们须要为RestClient提供一个SnifferOnFailureListener,并在Client初始化的时候设置这个监听器,同时在建立Sniffer以后也须要将其设置给这个监听器。这样一来,每次失败以后,监听器监听到失败就会委托Sniffer去执行节点刷新:
 1 SniffOnFailureListener sniffOnFailureListener =
 2     new SniffOnFailureListener(); //建立Sniffer失败监听器
 3 RestClient restClient = RestClient.builder(
 4     new HttpHost("localhost", 9200))
 5     .setFailureListener(sniffOnFailureListener) //为RestClient设置监听器
 6     .build();
 7 Sniffer sniffer = Sniffer.builder(restClient)
 8     .setSniffAfterFailureDelayMillis(30000) //设置失败以后的额外调度延迟
 9     .build();
10 sniffOnFailureListener.setSniffer(sniffer); //为监听器设置Sniffer
  • 关于setSniffAfterFailureDelayMillis:当咱们在失败上设置了Sniffer监听后,每次失败不只会经过监听器勾起一次Sniffer的节点刷新行为,一次额外的Sniffer节点刷新行为也会比日常的间隔调度更快的到来(这是Sniffer的默认特征,且默认时间为1分钟)。假设失败的节点可以很快的恢复,咱们的应用须要尽快知道这个状况,那么咱们能够经过setSniffAfterFailureDelayMillis来调整这个额外Sniffer行为的延迟时间(这是什么意思呢?举个例子,当请求失败后,经过监听器触发的Sniffer行为获取到的节点列表多是剔除了有问题节点的列表,若是此时问题节点未恢复的话;可能这些问题节点在一阵子后自行恢复了,可是Sniffer尚未到下次触发时间,所以咱们须要调整一下这个失败后Sniffer额外调度的延迟时间,缩短该时间以尽快拉取到恢复后的节点列表)。

5.3.设置节点通讯协议

         Sniffer的节点列表刷新行为使用的是NodeInfo API,这个API只能返回节点的IP和PORT信息,对于链接节点使用到的协议没法返回,所以默认认为节点通讯使用的是HTTP协议。在使用HTTPS协议的状况下,咱们须要手工的设置一个ElasticsearchNodesSniffer实例用来告知Sniffer节点通讯的协议是HTTPS:
1 RestClient restClient = RestClient.builder(
2         new HttpHost("localhost", 9200, "http"))
3         .build();
4 NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(
5         restClient,
6         ElasticsearchNodesSniffer.DEFAULT_SNIFF_REQUEST_TIMEOUT,
7         ElasticsearchNodesSniffer.Scheme.HTTPS);
8 Sniffer sniffer = Sniffer.builder(restClient)
9         .setNodesSniffer(nodesSniffer).build();
        经过一样的方式,咱们能够设置SnifferRequestTimeout这个参数的值(默认1S)。当调用NodeInfoAPI时,这个超时参数会做为查询字符串中的参数,它告诉ES服务器查询节点列表的超时时间。所以当服务器侧出现超时的时候仍然会返回一个有效的响应结果(服务器侧查询节点列表的超时,不表明Sniffer请求自身的超时),这个响应结果可能只包含节点列表的一个子集,即在超时时间内能获取到的节点列表数据:
1 RestClient restClient = RestClient.builder(
2     new HttpHost("localhost", 9200, "http"))
3     .build();
4 NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(
5     restClient,
6     TimeUnit.SECONDS.toMillis(5),
7     ElasticsearchNodesSniffer.Scheme.HTTP);
8 Sniffer sniffer = Sniffer.builder(restClient)
9     .setNodesSniffer(nodesSniffer).build(); 

5.4.自定义NodesSniffer

        有时候,咱们须要自定义一个NodesSniffer完成一些个性化的特殊功能,好比从一个内部资源中获取节点列表(如文件)而不是从ES服务器集群中获取节点列表:
 1 RestClient restClient = RestClient.builder(
 2     new HttpHost("localhost", 9200, "http"))
 3     .build();
 4 NodesSniffer nodesSniffer = new NodesSniffer() {
 5         @Override
 6         public List<Node> sniff() throws IOException {
 7             return null; 
 8         }
 9     };
10 Sniffer sniffer = Sniffer.builder(restClient)
11     .setNodesSniffer(nodesSniffer).build();
相关文章
相关标签/搜索