【13】把 Elasticsearch 当数据库使:Join

使用 https://github.com/taowen/es-monitor 能够用 SQL 进行 elasticsearch 的查询。要真正把Elasticsearch看成数据库来使,Join是一个绕不过的话题。关于Elasticsearch如何支持join,这个slide总结得很好:http://www.slideshare.net/sirensolutions/searching-relational-data-with-elasticsearch。整体来讲有这么几种方式:python

  • 彻底不join,把关联表的字段融合到一张表里。固然这会形成数据的冗余git

  • 录入的时候join:使用 nested documents(nested document和主文档是同segment存储的,对于一个symbol,几千万个quote这样的场景就不适合了)github

  • 录入的时候join:使用 sirensql

  • 查询时join:使用 parent/child (这个是elasticsearch的特性,要求parent/child同shard存在)数据库

  • 查询时join:使用 siren-joins(就是一个在服务端求值的filter,而后把结果发布给每一个shard去作二次match)服务器

  • 查询时join:在客户端拼装第二个查询(和siren-joins差很少,可是多了一次客户端到服务器的来回)elasticsearch

  • 查询时join:在coordinate节点上作两个查询的join合并(https://github.com/NLPchina/elasticsearch-sql分布式

我我的喜欢的是siren-joins和客户端拼装这两种方案。这两种方案都是先作了一次查询,把查询结果再次分发到每一个分布式节点上再次去作分布式的聚合。相比在coordinate节点上去作join合并更scalable。ide

客户端求值

首先我来看如何在客户端完成结果集的求值ui

$ cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
    SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 1000;
    SAVE RESULT AS finance_symbols;
EOF

这里引入的 SAVE RESULT AS 就是用于触发前面的SQL的求值,并把结果集命名为 finance_symbols。若是由于一些中间结果咱们不须要,咱们也能够用REMOVE 命令把求值结果删除

$ cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
    SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 1000;
    SAVE RESULT AS finance_symbols;
    REMOVE RESULT finance_symbols;
EOF

甚至咱们可使用任意的python代码来修改result_map。

$ cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
    SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 1000;
    SAVE RESULT AS finance_symbols;
    result_map['finance_symbols'] = result_map['finance_symbols'][1:-1];
EOF

客户端Join

在客户端求值的基础上,咱们能够利用客户端保留的结果集来发第二个请求。

cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
    SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 5;
    SAVE RESULT AS finance_symbols;
    SELECT MAX(adj_close) FROM quote 
        JOIN finance_symbols ON quote.symbol = finance_symbols.symbol;
    REMOVE RESULT finance_symbols;
EOF

这个产生的Elaticsearch请求是这样的两条:

{
  "query": {
    "term": {
      "sector": "Finance"
    }
  }, 
  "size": 5
}

而后根据其返回,产生了第二个请求

{
  "query": {
    "bool": {
      "filter": [
        {}, 
        {
          "terms": {
            "symbol": [
              "TFSC", 
              "TFSCR", 
              "TFSCU", 
              "TFSCW", 
              "PIH"
            ]
          }
        }
      ]
    }
  }, 
  "aggs": {
    "MAX(adj_close)": {
      "max": {
        "field": "adj_close"
      }
    }
  }, 
  "size": 0
}

能够看到,所谓客户端join,就是用前一次的查询结果拼出了第二次查询的条件(terms filter)。

服务端Join

有了 siren-join 插件(https://github.com/sirensolutions/siren-join),咱们能够在服务端完成一样的join操做

cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
    WITH finance_symbols AS (SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 5);
    SELECT MAX(adj_close) FROM quote 
        JOIN finance_symbols ON quote.symbol = finance_symbols.symbol;
EOF

前面第一个查询是用SAVE RESULT AS求值并命名为finance_symbols,这里咱们并无求值而是给其取了一个名字(WITH AS),而后就能够引用了。

{
  "query": {
    "bool": {
      "filter": [
        {}, 
        {
          "filterjoin": {
            "symbol": {
              "indices": "symbol*", 
              "path": "symbol", 
              "query": {
                "term": {
                  "sector": "Finance"
                }
              }
            }
          }
        }
      ]
    }
  }, 
  "aggs": {
    "MAX(adj_close)": {
      "max": {
        "field": "adj_close"
      }
    }
  }, 
  "size": 0
}

可见产生的filterjoin把两步合为一步了。注意对于filterjoin查询,须要POST _coordinate_search 而不是_search这个URL。
Profile

[
  {
    "query": [
      {
        "query_type": "BoostQuery",
        "lucene": "ConstantScore(BytesFieldDataTermsQuery::[size=8272])^0.0",
        "time": "29.32334300ms",
        "breakdown": {
          "score": 0,
          "create_weight": 360426,
          "next_doc": 137906,
          "match": 0,
          "build_scorer": 15027540,
          "advance": 0
        },
        "children": [
          {
            "query_type": "BytesFieldDataTermsQuery",
            "lucene": "BytesFieldDataTermsQuery::[size=8272]",
            "time": "13.79747100ms",
            "breakdown": {
              "score": 0,
              "create_weight": 14903,
              "next_doc": 168010,
              "match": 0,
              "build_scorer": 13614558,
              "advance": 0
            }
          }
        ]
      }
    ],
    "rewrite_time": 30804,
    "collector": [
      {
        "name": "MultiCollector",
        "reason": "search_multi",
        "time": "1.529236000ms",
        "children": [
          {
            "name": "TotalHitCountCollector",
            "reason": "search_count",
            "time": "0.08967800000ms"
          },
          {
            "name": "MaxAggregator: [MAX(adj_close)]",
            "reason": "aggregation",
            "time": "0.1675550000ms"
          }
        ]
      }
    ]
  }
]

从profile的结果来看,其原理也是 terms filter(BytesFieldDataTermsQuery)。因此这也就决定了这种join只是伪join。真正的join不单单能够用第一个表去filter第二个表,并且要可以在第二个查询的计算阶段引用第一个阶段的结果。这个是仅仅用terms filter没法完成的。固然全部这些join的努力仅仅是让数据维护变得更加容易而已,若是咱们真的要求Elasticsearch的join和传统SQL同样强大,那么咱们也没法期望那么复杂的join能够快到哪里去,也就失去了使用Elasticsearch的意义了。有了上面两种Join方式,咱们能够在极度快速和极度灵活之间得到必定的选择权利。

相关文章
相关标签/搜索