search(4)- elastic4s-ElasticDsl

   上次分析了一下elastic4s的运算框架。原本计划接着开始实质的函数调用示范,不过看过了Elastic4s的全部使用说明文档后感受仍是走的快了一点。主要缘由是elasticsearch在7.0后有了不少重点调整改变,elastic4s虽然一直在源代码方面紧跟ES的变化,但使用文件却一直未能更新,因此从说明文档中学习elastic4s的使用方法是不可能的,必须从源码中摸索。花了些时间过了一次elastic4s的源码,感受这个工具库之后仍是挺有用的:一是经过编程方式产生json请求比较灵活,并且能够经过compiler来保证json语句的正确性。二是对搜索结果的处理方面:因为返回的搜索结果是一堆又长又乱的复杂json,不敢想象本身要如何正确的解析这些json, 而后才能调用到正确的结果,但elastic4s提供了一套很完善的response类,使用起来可能会很方便。实际上elastic4s的编程模式和scala语言运用仍是值得学习的。既然这样,我想可能用elastic4s作一套完整的示范,包括:索引建立、索引维护、搜索、聚合统计等,对了解和掌握elastic4s可能大有帮助。在这以前,咱们仍是再回顾一下elastic4s的运算原理:elastic4s的功能其实很简单:经过dsl语句组合产生json请求,而后发送给ES-rest终端, 对返回的json结果进行处理,筛选出目标答案。编程

上篇咱们讨论过elastic4s的基本运算框架:json

 client.execute( createIndex("company") .shards(2).replicas(3) ) ... val bookschema = putMapping("books") .as( KeywordField("isbn"), textField("title"), doubleField("price") ) client.execute( bookschema ) ... val futAccts = client.execute( search("bank").termQuery("city" -> "dante") ) futAccts.onComplete{ case Success(esresp) => esresp.result.hits.hits.foreach(h =>println(h.sourceAsMap)) case Failure(err) => println(s"search error: ${err.getMessage}") }

实际上execute(T)的T表明elastic4s支持的全部ES操做类型。这种方法实现有赖于scala的typeclass模式。咱们先看看execute函数定义:api

  // Executes the given request type T, and returns an effect of Response[U] // where U is particular to the request type. // For example a search request will return a Response[SearchResponse].
  def execute[T, U, F[_]](t: T)(implicit executor: Executor[F], functor: Functor[F], handler: Handler[T, U], manifest: Manifest[U]): F[Response[U]] = { val request = handler.build(t) val f = executor.exec(client, request) functor.map(f) { resp => handler.responseHandler.handle(resp) match { case Right(u) => RequestSuccess(resp.statusCode, resp.entity.map(_.content), resp.headers, u) case Left(error) => RequestFailure(resp.statusCode, resp.entity.map(_.content), resp.headers, error) } } }

上篇提过Handler[T,U]是个typeclass, 表明对不一样类型T的json构建方法。elastic4s提供了这个T类型的操做方法,以下:app

trait ElasticDsl extends ElasticApi with Logging with ElasticImplicits with BulkHandlers with CatHandlers with CountHandlers with ClusterHandlers with DeleteHandlers with ExistsHandlers with ExplainHandlers with GetHandlers with IndexHandlers with IndexAdminHandlers with IndexAliasHandlers with IndexStatsHandlers with IndexTemplateHandlers with LocksHandlers with MappingHandlers with NodesHandlers with ReindexHandlers with RoleAdminHandlers with RoleHandlers with RolloverHandlers with SearchHandlers with SearchTemplateHandlers with SearchScrollHandlers with SettingsHandlers with SnapshotHandlers with UpdateHandlers with TaskHandlers with TermVectorHandlers with UserAdminHandlers with UserHandlers with ValidateHandlers { implicit class RichRequest[T](t: T) { def request(implicit handler: Handler[T, _]): ElasticRequest = handler.build(t) def show(implicit handler: Handler[T, _]): String            = Show[ElasticRequest].show(handler.build(t)) } } object ElasticDsl extends ElasticDsl

全部的操做api在这里:框架

trait ElasticApi extends AliasesApi with ElasticImplicits with AggregationApi with AnalyzerApi with BulkApi with CatsApi with CreateIndexApi with ClearRolesCacheApi with ClusterApi with CollapseApi with CountApi with CreateRoleApi with CreateUserApi with DeleteApi with DeleteIndexApi with DeleteRoleApi with DeleteUserApi with DynamicTemplateApi with ExistsApi with ExplainApi with ForceMergeApi with GetApi with HighlightApi with IndexApi with IndexAdminApi with IndexRecoveryApi with IndexTemplateApi with LocksApi with MappingApi with NodesApi with NormalizerApi with QueryApi with PipelineAggregationApi with ReindexApi with RoleApi with ScriptApi with ScoreApi with ScrollApi with SearchApi with SearchTemplateApi with SettingsApi with SnapshotApi with SortApi with SuggestionApi with TaskApi with TermVectorApi with TokenizerApi with TokenFilterApi with TypesApi with UpdateApi with UserAdminApi with UserApi with ValidateApi { implicit class RichFuture[T](future: Future[T]) { def await(implicit duration: Duration = 60.seconds): T = Await.result(future, duration) } } object ElasticApi extends ElasticApi

经过 import ElasticDsl._  ,全部类型的api,handler操做方法都有了。下面是例子里的api方法:elasticsearch

trait CreateIndexApi { def createIndex(name: String): CreateIndexRequest = CreateIndexRequest(name) ... } trait MappingApi { ... def putMapping(indexes: Indexes): PutMappingRequest = PutMappingRequest(IndexesAndType(indexes)) } trait SearchApi { def search(index: String): SearchRequest = search(Indexes(index)) ... }

CreateIndexRequest, PutMappingRequest,SearchRequest这几个类型都提供了handler隐式实例:ide

trait IndexAdminHandlers { ... implicit object CreateIndexHandler extends Handler[CreateIndexRequest, CreateIndexResponse] { override def responseHandler: ResponseHandler[CreateIndexResponse] = new ResponseHandler[CreateIndexResponse] { override def handle(response: HttpResponse): Either[ElasticError, CreateIndexResponse] = response.statusCode match { case 200 | 201 => Right(ResponseHandler.fromResponse[CreateIndexResponse](response)) case 400 | 500 => Left(ElasticError.parse(response)) case _         => sys.error(response.toString) } } override def build(request: CreateIndexRequest): ElasticRequest = { val endpoint = "/" + URLEncoder.encode(request.name, "UTF-8") val params = scala.collection.mutable.Map.empty[String, Any] request.waitForActiveShards.foreach(params.put("wait_for_active_shards", _)) request.includeTypeName.foreach(params.put("include_type_name", _)) val body = CreateIndexContentBuilder(request).string() val entity = HttpEntity(body, "application/json") ElasticRequest("PUT", endpoint, params.toMap, entity) } } } ... trait MappingHandlers { ... implicit object PutMappingHandler extends Handler[PutMappingRequest, PutMappingResponse] { override def build(request: PutMappingRequest): ElasticRequest = { val endpoint = s"/${request.indexesAndType.indexes.mkString(",")}/_mapping${request.indexesAndType.`type`.map("/" + _).getOrElse("")}" val params = scala.collection.mutable.Map.empty[String, Any] request.updateAllTypes.foreach(params.put("update_all_types", _)) request.ignoreUnavailable.foreach(params.put("ignore_unavailable", _)) request.allowNoIndices.foreach(params.put("allow_no_indices", _)) request.expandWildcards.foreach(params.put("expand_wildcards", _)) request.includeTypeName.foreach(params.put("include_type_name", _)) val body = PutMappingBuilderFn(request).string() val entity = HttpEntity(body, "application/json") ElasticRequest("PUT", endpoint, params.toMap, entity) } } } ... trait SearchHandlers { ... implicit object SearchHandler extends Handler[SearchRequest, SearchResponse] { override def build(request: SearchRequest): ElasticRequest = { val endpoint =
        if (request.indexes.values.isEmpty) "/_all/_search"
        else
          "/" + request.indexes.values .map(URLEncoder.encode(_, "UTF-8")) .mkString(",") + "/_search" val params = scala.collection.mutable.Map.empty[String, String] request.requestCache.map(_.toString).foreach(params.put("request_cache", _)) request.searchType .filter(_ != SearchType.DEFAULT) .map(SearchTypeHttpParameters.convert) .foreach(params.put("search_type", _)) request.routing.map(_.toString).foreach(params.put("routing", _)) request.pref.foreach(params.put("preference", _)) request.keepAlive.foreach(params.put("scroll", _)) request.allowPartialSearchResults.map(_.toString).foreach(params.put("allow_partial_search_results", _)) request.batchedReduceSize.map(_.toString).foreach(params.put("batched_reduce_size", _)) request.indicesOptions.foreach { opts => IndicesOptionsParams(opts).foreach { case (key, value) => params.put(key, value) } } request.typedKeys.map(_.toString).foreach(params.put("typed_keys", _)) val body = request.source.getOrElse(SearchBodyBuilderFn(request).string()) ElasticRequest("POST", endpoint, params.toMap, HttpEntity(body, "application/json")) } } }
相关文章
相关标签/搜索