search(3)- elastic4s-QueryDSL

  elastic4s是elasticsearch一个第三方开发的scala语言终端工具库(Elastic4s is a concise, idiomatic, reactive, type safe Scala client for Elasticsearch.)。scala用户能够用elastic4s提供的DSL用编程代码形式来构建ES服务请求。与字符型json文本直接编写请求不一样的是:在编译DSL编写的ES服务请求时能够发现不管是语法上或者语意上的错误。通常来说:elastic4s的程序流程相对直接、简单,以下:react

 client.execute { indexInto("books" ).fields("title" -> "重庆火锅的十种吃法", "content" -> "在这部书里描述了火锅的各类烹饪方式") }.await val response = client.execute { search("books").matchQuery("title", "火锅") }.await ... ...

一项ES操做服务从构建请求到具体运行都是在execute(T)这个函数里进行的。值得注意的是这个T类型在上面的例子里能够是IndexRequest或者SearchRequest,以下:编程

   def indexInto(index: Index): IndexRequest
...
   def search(index: String): SearchRequest

实际上execute(T)的T表明elastic4s支持的全部ES操做类型。这种方法实现有赖于scala的typeclass模式。咱们先看看execute函数定义:json

  // Executes the given request type T, and returns an effect of Response[U] // where U is particular to the request type. // For example a search request will return a Response[SearchResponse].
  def execute[T, U, F[_]](t: T)(implicit executor: Executor[F], functor: Functor[F], handler: Handler[T, U], manifest: Manifest[U]): F[Response[U]] = { val request = handler.build(t) val f = executor.exec(client, request) functor.map(f) { resp => handler.responseHandler.handle(resp) match { case Right(u) => RequestSuccess(resp.statusCode, resp.entity.map(_.content), resp.headers, u) case Left(error) => RequestFailure(resp.statusCode, resp.entity.map(_.content), resp.headers, error) } } }

这个函数比较重要的功能之一应该是构建服务请求了。这个功能是经过handler.build(t)实现的。handler: Handler[T,U]是个隐式参数,它就是一个typeclass: app

/** * A [[Handler]] is a typeclass used by the [[ElasticClient]] in order to * create [[ElasticRequest]] instances which are sent to the elasticsearch * server, as well as returning a [[ResponseHandler]] which handles the * response from the server. * * @tparam T the type of the request object handled by this handler * @tparam U the type of the response object returned by this handler */
abstract class Handler[T, U: Manifest] extends Logging { def responseHandler: ResponseHandler[U] = ResponseHandler.default[U] def build(t: T): ElasticRequest }

这个抽象类中有两个函数,其中一个就是build(t: T):ElasticRequest,也是个抽象方法,必须在构建实例时实现。在execute(T)中handler是一个隐式参数,也就是说若是在调用这个函数的可视域内能发现Handler[T,U]实例,则可获取handler,而后可调用handler.build(t)来构建请求。这个T类型是便是调用execute(T)这个T类型了,上面说过T能够是ES的任何操做类型,也就是说若是这些操做类型都继承了Handler[T,U],那么必须按照要求实现build(t:T)来构建该操做类型所需的服务请求ElasticRequest。下面就是例子里两个操做类型须要的隐式实例:elasticsearch

 implicit object IndexHandler extends Handler[IndexRequest, IndexResponse] { override def responseHandler: ResponseHandler[IndexResponse] = new ResponseHandler[IndexResponse] { override def handle(response: HttpResponse): Either[ElasticError, IndexResponse] = response.statusCode match { case 201 | 200                   => Right(ResponseHandler.fromResponse[IndexResponse](response)) case 400 | 401 | 403 | 409 | 500 => Left(ElasticError.parse(response)) case _                           => sys.error(response.toString) } } override def build(request: IndexRequest): ElasticRequest = { val (method, endpoint) = request.id match { case Some(id) =>
          "PUT" -> s"/${URLEncoder.encode(request.index.name, StandardCharsets.UTF_8.name())}/_doc/${URLEncoder.encode(id.toString, StandardCharsets.UTF_8.name())}"
        case None =>
          "POST" -> s"/${URLEncoder.encode(request.index.name, StandardCharsets.UTF_8.name())}/_doc" } val params = scala.collection.mutable.Map.empty[String, String] request.createOnly.foreach( createOnly =>
          if (createOnly) params.put("op_type", "create") ) request.routing.foreach(params.put("routing", _)) request.parent.foreach(params.put("parent", _)) request.timeout.foreach(params.put("timeout", _)) request.pipeline.foreach(params.put("pipeline", _)) request.refresh.map(RefreshPolicyHttpValue.apply).foreach(params.put("refresh", _)) request.version.map(_.toString).foreach(params.put("version", _)) request.ifPrimaryTerm.map(_.toString).foreach(params.put("if_primary_term", _)) request.ifSeqNo.map(_.toString).foreach(params.put("if_seq_no", _)) request.versionType.map(VersionTypeHttpString.apply).foreach(params.put("version_type", _)) val body = IndexContentBuilder(request) val entity = ByteArrayEntity(body.getBytes, Some("application/json")) logger.debug(s"Endpoint=$endpoint") ElasticRequest(method, endpoint, params.toMap, entity) } } ... implicit object SearchHandler extends Handler[SearchRequest, SearchResponse] { override def build(request: SearchRequest): ElasticRequest = { val endpoint =
        if (request.indexes.values.isEmpty) "/_all/_search"
        else
          "/" + request.indexes.values .map(URLEncoder.encode(_, "UTF-8")) .mkString(",") + "/_search" val params = scala.collection.mutable.Map.empty[String, String] request.requestCache.map(_.toString).foreach(params.put("request_cache", _)) request.searchType .filter(_ != SearchType.DEFAULT) .map(SearchTypeHttpParameters.convert) .foreach(params.put("search_type", _)) request.routing.map(_.toString).foreach(params.put("routing", _)) request.pref.foreach(params.put("preference", _)) request.keepAlive.foreach(params.put("scroll", _)) request.allowPartialSearchResults.map(_.toString).foreach(params.put("allow_partial_search_results", _)) request.batchedReduceSize.map(_.toString).foreach(params.put("batched_reduce_size", _)) request.indicesOptions.foreach { opts => IndicesOptionsParams(opts).foreach { case (key, value) => params.put(key, value) } } request.typedKeys.map(_.toString).foreach(params.put("typed_keys", _)) val body = request.source.getOrElse(SearchBodyBuilderFn(request).string()) ElasticRequest("POST", endpoint, params.toMap, HttpEntity(body, "application/json")) } }

以上IndexHandler, SearchHandler就是针对index,search操做的Handler[T,U]隐式实例。它们的build(t:T)函数分别按传入的T类型参数构建了各自要求格式的服务请求。ide

我老是觉着:不必定全部类型的服务请求都适合用DSL来构建,好比多层逻辑条件的json,可能不容易用DSL来实现(我我的的顾虑)。那么应该有个接口直接json文本嵌入request-entity。elastic4s在各类操做类型的服务请求类型如IndexRequest, SearchRequest,BulkRequest等提供了source:Option[String]字段接收json文本,以下:函数

case class IndexRequest(index: Index, ... source: Option[String] = None) extends BulkCompatibleRequest { ... def source(json: String): IndexRequest = copy(source = json.some) ... } case class SearchRequest(indexes: Indexes, ... source: Option[String] = None, ... typedKeys: Option[Boolean] = None) { ... /** * Sets the source of the request as a json string. Note, if you use this method * any other body-level settings will be ignored. * * HTTP query-parameter settings can still be used, eg limit, routing, search type etc. * * Unlike rawQuery, source is parsed at the "root" level * Query must be valid json beginning with '{' and ending with '}'. * Field names must be double quoted. * * NOTE: This method only works with the HTTP client. * * Example: * {{{ * search in "*" limit 5 source { * """{ "query": { "prefix": { "bands": { "prefix": "coldplay", "boost": 5.0, "rewrite": "yes" } } } }""" * } searchType SearchType.Scan * }}} */ def source(json: String): SearchRequest = copy(source = json.some) ... }

如今,咱们能够直接用json文本了:工具

  val json =
    """       |{ |  "query" : { |    "match" : {"title" : "火锅"} | } |} |""".stripMargin
  val response = client.execute { search("books").source(json)   // .matchQuery("title", "火锅")
  }.await
相关文章
相关标签/搜索