利用 Elastic4s 写入数据至 ElasticSearch

Elastic4s 是一个 scala 操做 ElasticSearch 的库,Github 地址为: elastic4s/samples/elastic4s-tcp-client-maven at master · sksamuel/elastic4s · GitHub,文档地址为: Elastic4s。本文记录利用 Elastic4s 将数据写入 ElasticSearch 时遇到的问题和解决方法。

1. 写入方式

1.1 HTTP 方式

import com.sksamuel.elastic4s.ElasticsearchClientUri
import com.sksamuel.elastic4s.http.HttpClient
import com.sksamuel.elastic4s.http.ElasticDsl._

// onComplete 异步支持
import scala.concurrent.ExecutionContext.Implicits.global

class HTTPDemo extends App {
    val url = "localhost"
    val port = 9200
    val client = HttpClient(ElasticsearchClientUri(url, port))
    client.execute {
        bulk(
          indexInto("myindex" / "mytype").fields("country" -> "Mongolia", "capital" -> "Ulaanbaatar"),
          indexInto("myindex" / "mytype").fields("country" -> "Namibia", "capital" -> "Windhoek")
        )
    }.onComplete({
        case _ => {
            client.close()
        }
    })
}

1.2 TCP 方式

import com.sksamuel.elastic4s.ElasticsearchClientUri
import com.sksamuel.elastic4s.TcpClient
// 注意该 import 的变更
import com.sksamuel.elastic4s.ElasticDsl._

// onComplete 异步支持
import scala.concurrent.ExecutionContext.Implicits.global

class HTTPDemo extends App {
    val url = "localhost"
    val port = 9200
    val clusterName = "mycluster"
    
    // 主要区别在这里
    val client = TcpClient.transport(s"elasticsearch://${url}:${port}?cluster.name=${clusterName}")
  
    client.execute {
        bulk(
          indexInto("myindex" / "mytype").fields("country" -> "Mongolia", "capital" -> "Ulaanbaatar"),
          indexInto("myindex" / "mytype").fields("country" -> "Namibia", "capital" -> "Windhoek")
        )
    }.onComplete({
        case _ => {
            client.close()
        }
    })
}

2. 问题及解决方法

2.1 使用 TCP 方法链接时遇到 available.processors 问题

报错信息为:html

Getting availableProcessors is already set to [n], rejecting [n] IllegalStateException exception

解决方案为:java

在调用 TCP 链接前,先执行:node

System.setProperty("es.set.netty.runtime.available.processors", "false");

2.2 使用 TCP 链接一直超时

解决方案:git

ElasticSearch 的配置中,默认会开启端口为 9200 的 HTTP 端口,以及端口范围为 9300~9400 的 TCP 端口。github

对于远程链接而言,HTTP 须要增长配置 network.host: 0.0.0.0;TCP 须要增长配置:transport.host: 0.0.0.0ajax

具体配置能够参考官方文档:[Transport | Elasticsearch Reference [5.6] | Elastic](https://www.elastic.co/guide/...api

publish_hostbind_host 之间的区别为:networking - What's the difference between bind_host and publish_host in ElasticSearch? - Stack Overflow异步

2.3 使用 TCP 链接时报错 NoNodeAvailableException

报错信息:elasticsearch

NoNodeAvailableException : None of the configured nodes are available

解决方法:maven

当咱们采用默认集群名称时,即没有改变 cluster.name: xxx 配置时,能够使用以下方法创建 TCP 链接:

val url = "localhost"
val port = 9200

val client = HttpClient(ElasticsearchClientUri(url, port))

但当咱们改变了集群名称的配置时,链接方式须要改成:

val url = "localhost"
val port = 9200
val clusterName = "mycluster"

val client = TcpClient.transport(s"elasticsearch://${url}:${port}?cluster.name=${clusterName}")

若是集群有多个节点,也能够配置为( 只配置其中一个节点也能够 ):

elasticsearch://host1:9300,host2:9300,host3:9300?cluster.name=my-cluster

2.4 jackson 方法缺失问题

报错信息:

Exception in thread “main” java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonFactory.xxx

解决方法:

在引用的各类依赖中,可能出现 jackson 各组件版本不匹配的问题。在这种状况下,咱们须要在 pom.xml 中显式声明各组件的版本:

<jackson.version>2.8.8</jackson.version>

...

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>${jackson.version}</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
    <version>${jackson.version}</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>${jackson.version}</version>
</dependency>

2.5 同步异步

在 Elastic4s 中,不管 HTTP 或是 TCP 链接,均可以指定调用方式为同步或是异步:

// 同步方式
client.execute {bulk}.await

// 异步方式
client.execute {bulk}

在同步异步的调用方式上,会致使如下几个问题:

2.5.1 异步调用是提示缺乏引入

报错信息:

Cannot find an implicit ExecutionContext. You might pass an (implicit ec: ExecutionContext) parameter to your method or import scala.concurrent.ExecutionContext.Implicits.global.

解决方法:

import scala.concurrent.ExecutionContext.Implicits.global

2.5.2 在异步调用后马上关闭链接致使数据没有写入

若是咱们在使用异步方式时这么作:

client.execute {bulk}
client.close

会致使接口调用在未完成时就中断了,使得虽然没有报错或提示信息,但数据确实没有写入到 ES 中。

解决方案:

相似 ajax,咱们能够使用以下方式完成接口调用完成后的资源回收:

client.execute {bulk}
      .onComplete({
          case _ => {
              client.close()
          }
      })

2.6 日期格式问题

就查阅的资料,index 会将格式为:yyyy-MM-dd HH:mm:ss Zyyyy-MM-dd Z 的字符串自动识别为日期格式,但就我在 5.6 版本的实验而言,在默认状况下,可以被识别为日期的字符串格式为:yyyy-MM-dd HH:mm:ssyyyy-MM-dd

参考连接

  1. [Getting availableProcessors is already set to [1], rejecting [1] IllegalStateException exception - Elasticsearch - Discuss the Elastic Stack](https://discuss.elastic.co/t/...
  2. Elasticsearch 5.4.1 - availableProcessors is already set - Elasticsearch - Discuss the Elastic Stack
  3. How do I enable remote access/request in Elasticsearch 2.0? - Stack Overflow
  4. networking - What's the difference between bind_host and publish_host in ElasticSearch? - Stack Overflow
  5. NoNodeAvailableException None of the configured nodes are available: {127.0.0.1:9300} · Issue 972 · sksamuel/elastic4s · GitHub
  6. java - NoNodeAvailableException : None of the configured nodes are available - Stack Overflow
  7. elasticsearch 5.5使用TransportClient初始化抛异常 - CSDN博客
  8. Exception in thread "main" java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonFactory.requiresPropertyOrdering()Z - Stack Overflow
  9. Date datatype | Elasticsearch Reference 5.6 | Elastic
相关文章
相关标签/搜索