日期 | 做者 | 版本 | 备注 |
---|---|---|---|
2020-07-02 | dingbin | v1.0 | |
上一篇文章Elasticsearch7.8详尽使用指南(一):ElasticSearch集群部署实践中咱们讲述了ElasticSearch集群部署方法。Es集群已经部署好了,接下来咱们要新建索引,并展现如何构建全量和增量数据到索引中去的详尽实践方法论。html
注意:java
本文所述设计的全部的程序安装包和相关源代码均提供下载: es7.8package.rar 提取码: x4gg
解压后内容以下图:(其中esproj.zip是本文开源的es构建全量或实时索引的相关java源代码)
![]()
本文针对项目的测试数据假定来源都存储在mysql数据库中。以下图所示:node
数据库data库中的position表中共计有91644条数据。咱们以此91644条数据为例逐步demo实践若是将该表中全部数据构建到es索引中去。mysql
接下来咱们在叙述es新建索引和配置mapping的过程当中会大量地用到es提供的restful API。如下罗列最经常使用的es restful API:web
查看es集群健康状态:curl http://192.168.0.110:19200/_cat/health?
查看es集群结点:curl http://192.168.0.110:19200/_cat/nodes?v
可见有3个节点es-node1,es-node2和es-node3 。正则表达式
查看es集群有哪些索引:curl http://192.168.0.110:19200/_cat/indices?v
可见es目前没有任何索引算法
在elasticsearch7.x之前的版本中,能够这么理解: es也至关于一个数据库,均可以存储数据。咱们对照mysql存储数据须要先建立database和table,相应地,es存储数据以前,也要建立index和type,咱们简单感性地认为index和type至关于mysql的database和table。显然同mysql建立table同样,咱们要定义索引数据的schema,在es中有特定的术语叫作mapping,至关于mysql的table定义。可是这样的理解在es7.x后不成立了!!!!
如上图所示,elasticsearch 7.x版本以后已经完全去除了type的概念,elasticsearch7默认不在支持指定索引类型,默认索引类型是_doc。index下面再也不容许设置type。直接给某个index设置mapping便可。至关于index就是mysql数据库中的dabase.table了。一个index惟一对应一个 mapping且再也不支持type是es 7.x之后的正确现状。更详细的细节可参考:
https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html#mappingsql
假定对应position招聘职位表数据,咱们新建的es的index名称叫:position。数据库
建立索引名称为position: curl -XPUT http://192.168.0.110:19200/position?pretty
再次查看索引:curl http://192.168.0.110:19200/_cat/indices?v
可见名称叫position的index已经被建立,它的docs.count为0,目前没有任何文档即数据。
删除索引job: curl -XDELETE http://192.168.0.110:19200/position?pretty
apache
接下来要为position 索引库新建一个mapping。下面重点叙述es的mapping即映射。
查看指定index的mapping命令:curl -XGET http://192.168.0.110:19200/position/_mapping?pretty
可见此时索引没有任何mapping。Mapping为空。
查询全部index中的mapping: curl -XGET http://192.168.0.110:19200/_all/_mapping?pretty
Es7.x 版本下为position这个index新建一个mapping的restful API语法有以下两种合法的形式:
1) 事先不单首创建index,而将index和mapping一次性在一个命令中建立,语法以下:
curl -XPUT http://192.168.0.110:19200/position/?pretty -H 'content-Type:application/json' -d ' { "mappings":{ "properties":{ "title":{ "type":"text" }, "description":{ "type":"text" }, "price":{ "type":"double" }, "onSale":{ "type":"boolean" }, "type":{ "type":"integer" }, "createDate":{ "type":"date" } } } }'
2) 分阶段建立:先单首创建index, 再基于此index建立mapping,语法以下:
curl -XPUT http://192.168.0.110:19200/position1/_mappings?pretty -H 'content-Type:application/json' -d ' { "properties":{ "title":{ "type":"text" }, "description":{ "type":"text" }, "price":{ "type":"double" }, "onSale":{ "type":"boolean" }, "type":{ "type":"integer" }, "createDate":{ "type":"date" } } }'
以上两种方式咱们建议方式1)一次性建立index和mapping。简单明了。
最后再次查看如今的index和mapping:
查看index: curl http://192.168.0.110:19200/_cat/indices?v
再次执行查看maping: curl -XGET http://192.168.0.110:19200/_all/_mapping?pretty
至此,咱们叙述了如何建立索引和其对应的字段类型定义mapping。这里描述的是建立静态mapping的方式。后面咱们将利用更加便利的索引模板的方式来建立mapping。
这里先删除postion和position1两个index以下:
接下来叙述一下es的字段数据类型。
Es支持很是多的字段数据类型:(如下叙述的都是es7.x尤为是es7.8支持的字段数据类型,低版本的es跟这个可能有些许差异,要注意!)
- text:默认会进行分词,支持模糊查询(5.x以后版本string类型已废弃,请你们使用text)。
- keyword:不进行分词;keyword类型默认开启doc_values来加速聚合排序操做,占用了大量磁盘io 如非必须能够禁用doc_values。
- number:若是只有过滤场景 用不到range查询的话,使用keyword性能更佳,另外数字类型的doc_values比字符串更容易压缩。
- array:es不须要显示定义数组类型,只须要在插入数据时用'[]'表示便可,'[]'中的元素类型需保持一致。
- range:对数据的范围进行索引;目前支持 number range、date range 、ip range。
- boolean: 只接受true、false 也能够是字符串类型的“true”、“false”
- date:支持毫秒、根据指定的format解析对应的日期格式,内部以long类型存储。
- geo_point:存储经纬度数据对。
- ip:将ip数据存储在这种数据类型中,方便后期对ip字段的模糊与范围查询。
- nested:嵌套类型,一种特殊的object类型,存储object数组,可检索内部子项。
核心数据类型如上图所示,其中须要特别说明的是:
string类的text和keyword区别必定要重视:把一个string字段指定为text类型说明该字段内容是要通过中文分词器进行分词,而后切分红多个term用于全文检索的。于此相对,若是一个string字段被指定为keyword,就说明内部建索引过程不会对该字段文本内容进行分词,它会做为一个总体被建到索引正排里去。
该数据类型主要针对支持地理坐标数据进行快速经纬度查询而支持的一种特殊的数据类型。
更详细的内容细节可参见网页:
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
本节是咱们推荐的建立索引和索引定义mapping的方式。由于它更加便利。本文采用索引模板index templates的方式来配置索引的schema。es索引可以使用预约义的模板进行建立,这个模板称做Index templates。注意:索引模板支队全部新创建的索引有效,对已经存在的索引无效。模板设置包括settings和mappings,经过模式匹配的方式使得多个索引重用一个模板。关于索引模板的更详细细节可参考网页:https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates-v1.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#match-mapping-type
虽然es7.8引入了新的复合索引模板,但仍然支持传统的索引模板。本文使用传统的索引模板格式(legacy index template )。
在索引模板中有一个很是重要的field就是:match_mapping_type,它表明文档字段值被字面探测识别成的数据类型,它的规则以下:
1) true 或false 被自动探测识别为boolean类型;
2) 当date_detection开关打开时,一些符合特定日期格式字符串将会被自动探测识别为date类型;
3) 全部带有小数部分的数字被自动探测识别为double类型。(注意不是float类型)
4) 不带有小数部分的数字被自动识别为long类型;(注意不是integer类型。)
5) 对于对象类型统一识别为object类型;
6) 最重要的,全部字符串统一识别为string类型;
7) *表明任意类型。
上图是一个示例。
此外,_all在7.x版本已经被copy_to所代替,可用于知足特定场景。copy_to将字段数值拷贝到目标字段,实现相似_all的做用。
注意:copy_to的目标字段不出如今_source中
当date_detection被设置为true(默认)时,凡是string类型且符合
strict_date_optional_time设置的日期格式的字段都被识别为类型date。
strict_date_optional_time默认格式以下:"strict_date_optional_time":"yyyy/MM/dd HH:mm:ss Z||yyyy/MM/dd Z"
date格式能够在put mapping的时候用 format 参数指定,若是不指定的话,则启用默认格式,是"strict_date_optional_time||epoch_millis"。这代表只接受符合"strict_date_optional_time"格式的字符串值,或者long型数字。
match表征文档的字段名称匹配什么样的模式,是一种过滤选项。默认以通配符的形式匹配:即*或?等表示匹配含义。unmatch在上述match匹配的结果集中构建排除集。
若是想支持正则表达式匹配方式,则加上match_pattern:regex便可。一个示例以下图所示:
通常来讲从新索引过程当中的会遇到的一个比较头疼的问题是必须更新你的应用,来使用另外一个索引名。索引别名正是用来解决这个问题的!
索引别名就像一个快捷方式或软链接, 能够指向一个或多个索引, 也能够给任何须要索引名的API 使用。别名带给咱们极大的灵活性,容许咱们作到:
这里有两种管理别名的途径: _alias 用于单个操做, _aliases 用于原子化多个操做。
在这一章中, 咱们假设你的应用采用一个叫 my_index 的索引。 而事实上, my_index 是一个指向当前真实索引的别名。真实的索引名将包含一个版本号: my_index_v1 , my_index_v2 等等。开始, 咱们建立一个索引 my_index_v1 , 而后将别名 my_index 指向它:
开始, 咱们建立一个索引 my_index_v1 , 而后将别名 my_index 指向它:
<1> 建立索引 my_index_v1 。
<2> 将别名 my_index 指向 my_index_v1 。
你能够检测这个别名指向哪一个索引:
或哪些别名指向这个索引:
二者都将返回下列值:
而后, 咱们决定修改索引中一个字段的映射。 固然咱们不能修改现存的映射, 索引咱们须要从新索引数据。 首先, 咱们建立有新的映射的索引 my_index_v2 。
而后咱们从将数据从 my_index_v1 迁移到 my_index_v2 。一旦咱们认为数据已经被正确的索引了, 咱们就将别名指向新的索引。
别名能够指向多个索引, 因此咱们须要在新索引中添加别名的同时从旧索引中删除它。 这个操做须要原子化, 因此咱们须要用 _aliases 操做:
这样,你的应用就从旧索引迁移到了新的,而没有停机时间。
提示:
即便你认为如今的索引设计已是完美的了,当你的应用在生产 环境使用时,仍是有可能在从此有一些改变的。因此请作好准备:在应用中使用别名而不是索引。而后你就能够在任什么时候候重建索引。别名的开销很小,应当普遍使用。
Elasticsearch的别名,就相似数据库的视图。
建立别名:
咱们为索引my_index建立一个别名my_index_alias,这样咱们对my_index_alias的操做就像对my_index的操做同样
POST /_aliases { "actions": [ { "add": { "index": "my_index", "alias": "my_index_alias" } } ] }
别名不只仅能够关联一个索引,它能聚合多个索引
咱们为索引my_index_1 和 my_index_2 建立一个别名my_index_alias,这样对my_index_alias的操做(仅限读操做),会操做my_index_1和my_index_2,相似于聚合了my_index_1和my_index_2.咱们是不能对my_index_alias进行写操做,当有多个索引时alias,不能区分到底操做哪个
POST /_aliases { "actions": [ { "add": { "index": "my_index_1", "alias": "my_index_alias" } }, { "add": { "index": "my_index_2", "alias": "my_index_alias" } } ] } GET /my_index_alias/_search { }
建立filtered的别名:
例如对于同一个index,咱们给不一样人看到不一样的数据,
如my_index有个字段是team,team字段记录了该数据是那个team的。team之间的数据是不可见的。
POST /_aliases { "actions": [ { "add": { "index": "my_index", "alias": "my_index__teamA_alias", "filter":{ "term":{ "team":"teamA" } } } }, { "add": { "index": "my_index", "alias": "my_index__teamB_alias", "filter":{ "term":{ "team":"teamB" } } } }, { "add": { "index": "my_index", "alias": "my_index__team_alias" } } ] } GET /my_index__teamA_alias/_search 只能看到teamA的数据 GET /my_index__teamB_alias/_search 只能看到teamB的数据 GET /my_index__team_alias/_search 既能看到teamA的,也能看到teamB的数据
所以在索引模板中咱们强烈建议引入索引别名,以应对将来在生产环境中必定会发生的索引升级。
下图是索引模板中设置索引别名的一个示例:
好了,到目前为止,关于索引模板的预备知识咱们基本都讲述完毕了。下面开始实战过程:
为position招聘职位表数据定义一个索引模板。
CREATE TABLE `position` ( `id` varchar(64) NOT NULL COMMENT '缂栧彿', `recruitment_id` varchar(64) NOT NULL COMMENT '鎷涜仒缂栧彿', `position_name` varchar(256) DEFAULT NULL COMMENT '鑱屼綅鍚嶇О', `student_type` varchar(256) DEFAULT NULL COMMENT '瀛﹀巻', `student` varchar(255) DEFAULT NULL, `majorName` longtext, `major` longtext COMMENT '涓撲笟', `demand_number` varchar(8) DEFAULT NULL COMMENT '闂傚倸娲犻崑鎾存叏閻熸澘鈧嘲效婢舵劕鏋?', `position_description` longtext COMMENT '鑱屼綅鎻忚堪', `city` text, `cityName` text, `college` varchar(500) DEFAULT NULL, `sut1` varchar(255) DEFAULT NULL, `major_standard` text COMMENT '标准专业', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='招聘职位信息';
以上是招聘职位信息的 mysql数表定义。
为了实现对该数据表实时数据的索引增量,咱们统一为全部业务表再 新增一个字段:_updatetime,类型为bigint,而且同时建立该字段的mysql 索引idx_for_updatetime,建索引的目的是为了快速查找定位。该字段存储前台发送来的对该条招聘职位信息改动(第一次叫建立)的时间戳,记录为自1970.1.1日0时0分0秒到如今经历的毫秒数,其值相似于1594508968000。
接下来,对position 插入最后一列字段为_updatetime,值统一设定为0,mysql语句以下:ALTER TABLE position ADD COLUMN _updatetime BIGINT NOT NULL COMMENT 'updated timestamp in epoch milliseconds' after major_standard;
为_updatetime创建索引,sql执行以下:ALTER TABLE position ADD INDEX idx_for_updatetime(_updatetime) ;
以上表格中除了红色标注“是”的项目须要文本分词以支持全文索引,其余文本不须要分词。大部分中文分词器在检索时选择ik_max_word,在检索时选择分词组件:ik_smart。对应以下:
建索引时分词模块:"analyzer": "ik_max_word" 检索时分词模块: "search_analyzer": "ik_smart"
_updatetime字段es格式为date,format 为epoch_millis。
对于major和city 这两个字段,虽然是文本类型,但他们的取值特色以下:
可见他们都是以逗号分隔的多个部分,每一个部分具备完整意义不能再被细分词。因此咱们对这两个字段单独采用逗号分词器:
根据前节分析,咱们能够很容易写出招聘职位信息表数据的动态索引模板,以下:
{ "index_patterns":[ "collegejob_*" ], "order":0, "settings":{ "number_of_shards":5, "number_of_replicas":1, "analysis":{ "analyzer":{ "comma":{ "pattern":",", "type":"pattern" } } } }, "mappings":{ "_source":{ "enabled":true }, "dynamic":"true", "date_detection":false, "numeric_detection":true, "properties":{ "id":{ "type":"keyword" }, "recruitment_id":{ "type":"keyword" }, "demand_number":{ "type":"text", "index":"true", "analyzer":"ik_max_word", "search_analyzer":"ik_smart", "fields":{ "raw":{ "type":"keyword", "index":true, "ignore_above":1024 } } }, "major":{ "type":"text", "analyzer":"comma", "search_analyzer":"comma" }, "city":{ "type":"text", "analyzer":"comma", "search_analyzer":"comma" }, "updatetime":{ "type":"date", "format":"epoch_millis" } }, "dynamic_templates":[ { "string_fields":{ "match":"*", "match_mapping_type":"string", "mapping":{ "type":"text", "index":"true", "analyzer":"ik_max_word", "search_analyzer":"ik_smart", "fields":{ "raw":{ "type":"keyword", "index":true, "ignore_above":32766 } } } } } ] }, "aliases":{ "{index}-alias":{ } } }
上图是对文字版的动态索引模板的主要内容的简要说明(可能跟文字版有些许出入,以最终的文字版为准)。
新添加动态索引模板的es restfulAPI为:
curl -XPUT http://192.168.0.110:19200/_template/collegejob_template_1?pretty -H 'content-Type:application/json' -d ' {STATEMENT} '
将该语句中的{STATEMENT} 替换为上文 文字版的 动态索引模板内容便可。
执行结果为:
接下来查询刚才新建的索引模板,执行命令:curl -XGET http://192.168.0.110:19200/_template/collegejob_template_1?pretty
可见能查到刚才新建的collegejob_template_1索引模板了。
删除索引模板的命令是:curl -XDELETE http://192.168.0.110:19200/_template/collegejob_template_1?pretty
最后根据这个动态索引模板,咱们建立一个索引名称叫作:collegejob_position_v20200712_1.
注意index名称必需要能匹配到上面咱们已经建立好的动态索引模板中的index_patterns中的值的pattern。
RestfulAPI: curl -XPUT http://192.168.0.110:19200/collegejob_position_v20200712_1?pretty
再次查看索引:curl http://192.168.0.110:19200/_cat/indices?v
查看索引collegejob_position_v20200712_1的mapping:curl http://192.168.0.110:19200/collegejob_position_v20200712_1/_mappings?pretty
可见该index已经被正确关联到咱们预期的mapping上了。
至此,索引和mapping已经建立成功了。接下来开始构建全量索引数据和增量索引数据。
position ---招聘职位表,共计91644条测试数据。为了模拟分别构建全量索引数据和构建增量索引数据的过程,同时也模拟演示生产环境下数据产生与处理的过程,咱们对测试数据作以下处理:
1)全量和增量数据的区分字段是_updatetime(_updatetime字段是个bigint型数据,存储自1970.1.1 0:0:0 至今的毫秒数,要求全部业务数据表都必须有此字段和定义);
2)假定从当前时刻开始建全量索引索引,记录开始建全量索引的当前时刻,记录为S时刻:2020年 07月 12日 星期日 09:51:09 CST, 转换成_updatetime的数据格式为:1594518666324,注意是一个13位长整数。记住这一时刻很重要。后面构建索引时也须要这个分界时间戳S。
3)编写sql语句实现对position 表中一半数据的_updattime设置为早于时刻S的随机位于时间段[1577808000000, S],(1577808000000为2020.1.1) 另外一半数据的_updattime设置为晚于时刻S的随机位于时间段[S,1594522719000]。(1594522719000 为S+1小时时刻)。
Mysql生成在i ≤ R ≤ j 这个范围获得一个随机整数R ,公式为:FLOOR(i + RAND() * (j – i + 1))
结果以下:update position set _updatetime=FLOOR(1577808000000+RAND() * (1594518666324-1577808000000 + 1)) limit 45822 ;
update position set _updatetime=FLOOR(1594522719000+RAND() * (1594522719000-1594518666324 + 1)) where _updatetime=0 ;
接下来查询验证一下更新时间戳早于和晚于时刻S的数据条数:select count(1) from position where _updatetime < 1594518666324 ;
select count(1) from position where _updatetime > 1594518666324 ;
如今表中一半数据早于时刻S,另外一半数据晚于时刻S。
接下来构建mysql2es的全量索引。
在后面章节全量和增量构建索引阶段为了提升构建索引的效率和保证高可用,咱们常常用的一个实现思路是以多机器多进程代替单机单进程。而对于多机器多进程,不可避免涉及多进程之间工做进度的协同,须要用到分布式一致性的工具。建议使用zookeeper。好比分布式锁等。本节迅速简述一下zookeeper的安装部署和使用方法。
注意:安装zookeeper以前须要在每台机器上安装好jdk,建议安装至少jdk1.8及以上版本。本文安装的是jdk1.8。
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
ZooKeeper包含一个简单的原语集,提供Java和C的接口。
ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,其中分布锁和队列有Java和C两个版本,选举只有Java版本。
1) 原理
ZooKeeper是以Fast Paxos算法为基础的,Paxos 算法存在活锁的问题,即当有多个proposer交错提交时,有可能互相排斥致使没有一个proposer能提交成功,而Fast Paxos做了一些优化,经过选举产生一个leader (领导者),只有leader才能提交proposer,具体算法可见Fast Paxos。所以,要想弄懂ZooKeeper首先得对Fast Paxos有所了解。
ZooKeeper的基本运转流程:
2) 特色
在Zookeeper中,znode是一个跟Unix文件系统路径类似的节点,能够往这个节点存储或获取数据。若是在建立znode时Flag设置为EPHEMERAL,那么当建立这个znode的节点和Zookeeper失去链接后,这个znode将再也不存在在Zookeeper里,Zookeeper使用Watcher察觉事件信息。当客户端接收到事件信息,好比链接超时、节点数据改变、子节点改变,能够调用相应的行为来处理数据。Zookeeper的Wiki页面展现了如何使用Zookeeper来处理事件通知,队列,优先队列,锁,共享锁,可撤销的共享锁,两阶段提交。
那么Zookeeper能作什么事情呢,简单的例子:假设咱们有20个搜索引擎的服务器(每一个负责总索引中的一部分的搜索任务)和一个总服务器(负责向这20个搜索引擎的服务器发出搜索请求并合并结果集),一个备用的总服务器(负责当总服务器宕机时替换总服务器),一个web的cgi(向总服务器发出搜索请求)。搜索引擎的服务器中的15个服务器提供搜索服务,5个服务器正在生成索引。这20个搜索引擎的服务器常常要让正在提供搜索服务的服务器中止提供服务开始生成索引,或生成索引的服务器已经把索引生成完成能够提供搜索服务了。使用Zookeeper能够保证总服务器自动感知有多少提供搜索引擎的服务器并向这些服务器发出搜索请求,当总服务器宕机时自动启用备用的总服务器。
从zookeeper官网https://zookeeper.apache.org/releases.html 下载当前最新版本的zookeeper3.6.1。
分布式zookeeper(简称zk)集群至少要求运行在3台或以上服务器上。本文讲述是基于安装在3台vmware虚拟机上,各虚拟机机器结点以下表:
本节使用的全部vmware虚拟机配置均为CPU:8核,内存6G,硬盘足够。
根据咱们一向部署分布式服务的作法:
1) 先建立zk用户和zk组;
groupadd zk vim /etc/group 会发现最后一行有zk用户组 adduser -g zk zk 建立zk用户,同时加入zk用户组,自动建立zk的homedir为/home/zk vim /etc/passwd 能够看到最后一行是zk用户。 passwd zk 为zk用户新设立密码 将zk用户加入sudo权限 注意:本步骤非必须,可选。 chmod +w /etc/sudoers vim /etc/sudoers 添加以下行:
而后再chmod -w /etc/sudoers
2) 建立zk服务的basedir:/opt/zk
chown -R zk:zk /opt/zk
3) 在/opt/zk下分别建立app data logs temp分别做为zk的app/data/logs/temp 目录。
4) 配置zk
解压zk压缩包文件:apache-zookeeper-3.6.1-bin.tar.gz 到/opt/zk/app目录下:
在cent7a机器上执行:
cd /opt/zk/app/apache-zookeeper-3.6.1-bin/conf mv zoo_sample.cfg zoo.cfg vim zoo.cfg 修改以下:
其中clientPort 2181是客户端链接zk集群的端口,dataDir和dataLogDir分别是数据目录和日志目录。文件最后的3行是用于zk集群互联。
server.A = B:C:D A:zookeeper服务器的序号,即第几号服务器. 注意这个序号要与zookeeper的myid保持一致 B:服务器的 IP 地址 C:服务器跟随者follower与集群中的 Leader 服务器交换信息的端口 D:若是集群中的 Leader 服务器宕机,须要一个端口通讯从新进行选举,选出一个新的 Leader。这个端口就是用来作leader选举的端口
注意server.1/server.2/server.3 中的1/2/3是zk 结点的序号,不一样结点必须不能相同。
直接将此zoo.cfg一行不用修改原样拷贝到cent7b和cent7c机器上相同目录下。
接下来在cent7a的datadir即:/opt/zk/data下新建立myid文件,并写入1:
一样地,在cent7b的datadir即:/opt/zk/data下新建立myid文件,并写入2:
在cent7c的datadir即:/opt/zk/data下新建立myid文件,并写入3:
至此,zk集群配置结束。启动zk集群以前不要忘记开放3台机器上2181/2888/3888 三个端口:
systemctl start firewalld firewall-cmd --zone=public --add-port=2181/tcp --permanent firewall-cmd --zone=public --add-port=2888/tcp --permanent firewall-cmd --zone=public --add-port=3888/tcp --permanent firewall-cmd --reload
Zk集群主要操做命令以下:
在全部机器上执行:/opt/zk/app/apache-zookeeper-3.6.1-bin/bin/zkServer.sh start/stop/status/restart ##启动/中止/查询状态/重启 zk服务
可见zk集群成功,1个leader和2个follower。
在全部机器上执行:/opt/zk/app/apache-zookeeper-3.6.1-bin/bin/zkCli.sh ##链接本地服务器,默认是2181端口
/opt/zk/app/apache-zookeeper-3.6.1-bin/bin/zkCli.sh -server ip:port ##链接指定zk服务器和端口
ZooKeeper是经过客户端脚原本操做的。客户端脚本:zkCli.sh,存放在ZooKeeper的bin目录下。
默认链接本地的ZooKeeper服务器:#zkCli.sh
链接指定的ZooKeeper服务器:#zkCli.sh –server Server IP:port
在cent7a上运行:/opt/zk/app/apache-zookeeper-3.6.1-bin/bin/zkCli.sh -server 192.168.0.112:2181
,显示以下:
执行:/opt/zk/app/apache-zookeeper-3.6.1-bin/bin/zkCli.sh -server 192.168.0.112:2181
此时进入zookeeper系统的交互模式。
此时键入h或help命令,能够看到交互模式下支持的命令选项,以下图:
命令行工具的一些简单操做以下:
1) 显示根目录下、文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容 2) 显示根目录下、文件: ls2 / 查看当前节点数据并能看到更新次数等数据 3) 建立文件,并设置初始内容: create /zk "test" 建立一个新的 znode节点“ zk ”以及与它关联的字符串 4) 获取文件内容: get /zk 确认 znode 是否包含咱们所建立的字符串 5) 修改文件内容: set /zk "zkbak" 对 zk 所关联的字符串进行设置 6) 删除文件: delete /zk 将刚才建立的 znode 删除 7) 退出客户端: quit 8) 帮助命令: help
Zookeeper提供了丰富的java api 。后续可直接在附件的java工程中详见。
Zookeeper 有不少可视化工具,其中一个轻便易用的工具是ZooInspector. 程序包是:ZooInspector.zip (该文件附于项目交付清单中) 。解压后 直接在windows上双击 ZooInspector/build/ zookeeper-dev-ZooInspector.jar 便可打开图形界面以下:
键入上面搭建好的zookeeper集群192.168.0.112:2181 便可进入可视化界面:
构建从mysql2es的全量索引数据基于一个基本的假设:_updatetime字段值只会不变或增大,永远不会减少。
构建全量索引之初,咱们锁定当前时刻,即上节中的时刻S。咱们要作的就是编程实现select 表中全部_updattime <= 时刻S的数据,而后add进入到es索引collegejob_position_v20200712_1(前节已经建立好)中去便可。
实现上述编程逻辑并不十分困难,但仍然须要商榷如下细节:
1) 若是全量数据规模十分庞大,单进程程序进行mysql选择数据而且构建全量索引到ES的过程可能会比较耗时,若是提升效率?可采用的方案是多进程实现。能够单击多进程,也能够多机多进程。固然,若是你以为你的数据量小,不必多进程时,咱们的方案也能够适应。以上就是咱们具体构建全量索引的重要原则:多进程构建全量索引。
2) 多进程实现时首先会遇到多个进程同时select mysql库,如何解决数据冲突? 要避免进程A已经select 出去的数据绝对不能被进程B再次select!如何解决?
解决方案是:分布式锁(Distributed locks)。分布式锁的主流实现有不少,本文采用zookeeper实现。关于zookeeper集群安装部署和简单实用本文前节中已经叙述过。
3) 最后一个细节须要知道的是,考虑到全量构建索引数据规模可能很大,全量阶段向es添加文档构建索引采用ES提供的bulk API,以提升构建效率。有关es bulk API更详细的说明可参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html 。
构建全量和增量索引数据的java编程实现工程都在附件文件:esproj.zip。其项目工程目录以下图:
配置文件:
下面重点解释一下esbuilder-config.properties文件的细节:
编译和运行方法:
在主目录下执行:mvn clean install -U 便可。会生成:
1)lib目录; 2)config目录; 3)esbuilder.jar。
其中lib是依赖库,config是包含esbuilder-config.properties等所有须要的配置文件。Esbuilder.jar是最终的jar包。
运行时,必须保证以上3个目录和文件在同一个目录下。
执行 java -cp esbuilder.jar com.freedom.es.es.main. EsFullBuilder
便可。
由于支持多机器分布式部署,建议使用时,在多个机器上同时运行。
本文时间是在cent7a cent7b cent7c 3台机器上同时运行上述命令,执行以下:
1) 初始:
初始时由上图可见,es中collegejob_position_v20200712_1索引中文档数量为0.
由上图可见,初始时zk目录为空。(zookeepr目录忽略)。
2) 如上图,下面在3个机器上同时执行java -cp esbuilder.jar com.freedom.es.es.main.EsFullBuilder 命令,结果以下:
由上图可见,3个EsFullBuilder进程在3台机器上同时并行执行 建全量索引的过程。多台机器并行执行可显著提升构建索引效率,尤为是对于大规模数据更为明显和必要。
上图是执行过程当中zk上树节点的图。能看到每一个EsFullBuider会以本身所在机器IP:workdir的名称注册到zk上nodes节点下。data节点存放数据库_updatetime时间戳依次被处理后的剩余时间段范围。
3) 几分钟之后,执行结束。经过下面日志可到3个进程都把能从数据库获取的数据都获取了。执行结果去zk上看,以下:
全部的实例心跳结点都已经消失了,说明所有进程都退出了。每一个实例上面的内容都是SUCCESS,代表所有都成功了。
再看下es上索引状况:
可见文档数量已是45822条了。
再去数据库上查询一下:
数据库查询得出的记录条数也是45822,与ES索引文档数量一致,可见全量索引构建正确。
至此,全量索引构建过程所有结束。接下来到了构建增量实时索引过程。
构建增量索引是相对于构建全量索引而言的,通常指响应线上服务实时增长的文档流数据而构建的实时索引。
增量构建索引的数据源是一张mysql表,表示线上依次发生的实时流水数据,假定这张表的名称后缀都含有_instflow(实时流水的意思)。
对应招聘职位这一业务场景,以下建立其流水表:
CREATE TABLE `position_instflow` ( `id` varchar(64) NOT NULL COMMENT '文档id', `doctype` int(11) NOT NULL COMMENT '文档类型,增删改分别对应数字0,1,2', `sequenceby3` int(11) NOT NULL COMMENT '文档id相对于总数3的序号', `sequenceby5` int(11) NOT NULL COMMENT '文档id相对于总数5的序号', `sequenceby7` int(11) NOT NULL COMMENT '文档id相对于总数7的序号', `_updatetime` bigint(20) NOT NULL COMMENT 'updated timestamp in epoch milliseconds', `handled` int(11) NOT NULL COMMENT '该文档是否被处理过', PRIMARY KEY (`id`) USING BTREE, KEY `idx_for_sequenceby3` (`sequenceby3`), KEY `idx_for_sequenceby5` (`sequenceby5`), KEY `idx_for_sequenceby7` (`sequenceby7`), KEY `idx_for_updatetime` (`_updatetime`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='招聘职位信息流水表';
其中:
若是说按照文档id分段是显著提升了实时索引构建效率,引入一个id段内支持多机器分布式多进程EsInstBuilder的策略,则显著提升了实时索引构建系统的安全性和可靠性。由于每一个id段有多个EsInstBuilder进程,且都分布在多个机器上,若是其中的1个进程挂了,还有其它进程,并不影响实时构建流程。这两个策略是咱们本文实时构建索引系统设计的精华和灵魂,也是亮点。
由于咱们只存储了idx_for_sequenceby三、idx_for_sequenceby五、idx_for_sequenceby7,即代表咱们目前只但愿支持id分3/5/7段,更多的其余分段方案只须要修改流水表便可支持。本文实践以id分3段演示。
增量构建索引过程的配置文件仍然是同全量构建索引一个文件,如上图所示。
各参数解释以下:
上图是构建es索引的java 工程示意图,EsInstBuilder是主类和入口。Esbuilder-config.properties同上文所述,是配置文件。
编译方法仍是mvn clean install -U 。
程序运行方法是:仍然将config、lib、esbuider.jar 3个文件或目录平行放置在某个目录下。而后运行java -cp esbuilder.jar com.freedom.es.es.main. EsInstBuilder 便可。
接下来,咱们以id分3段,即es.instbuilder.buildercount设置为3,es.instbuilder.buildersequence分别为0,1,2。注意,同一个id段内咱们设置了2个进程,共同消费同一个id段内的实时文档。部署分布以下图所示:
上图可见3个分段,buildersequence分别要设置成为0,1,2,同一个分段内有2个进程。
再看看zk上树形结构分布以下图所示:
目前是空的。
再看看ES索引文档状况,以下图:
这仍是以前构建彻底量索引后的45822篇文档。
目前流水表是空的。
接下来,咱们分别在3个机上的以命令:java -cp esbuilder.jar com.freedom.es.es.main. EsInstBuilder 启动6个EsInstBuilder进程,以下图:
可见6个进程中,同属同一id分段内的2个进程在同时争抢1把分布式锁,得到锁的进程就去消费流水表中的流水数据,往ES中新建实时索引。
再看看这时候的zk上树形结点:
由上图可见,zk上目前有6个节点表明每一个进程的心跳结点,在分别共同争抢3把锁,有序地进行着消费流水数据的过程。由于目前流水表中是空的,因此6个进程就在空转。
接下来为了模拟线上实时数据,咱们利用java工程中的com.freedom.es.util.TmpInstImportData主类的功能来从全量数据表中抽取全部_updatetime>1594518666324时刻S的全部文档,动态每一个40毫秒插入一条到实时流水表中:jyfw_recruitment_position_instflow。
执行java -cp esbuilder.jar com.freedom.es.util. TmpInstImportData ,它会每40毫秒插入一条实时数据到流水表中,以下图所示:
可见这边在插入流水数据过程当中了。
再看看EsInstBuilder这边的日志,以下图:
可见都分别在有序地消费流水数据了。注意图中的耗时10秒是程序为了演示过程特意sleep 10秒每一轮,实际中消费流水速度都是很是快的。
再看看流水表中:
流水表中的handled字段为0表示没有被消费,为1表示已经被消费了。图中所有为1可见这些流水数据都被建入到ES索引中去了。
再看看此时ES索引中文档的状况,以下图所示:
可见es索引中文档数据在增长,从45822已经增长到了52481,这个数字还在不断增大。
生产环境下,上述6个EsInstBuilder要一直启动着,永远不能中止,以保证远远不断消费线上实时来的流水数据,再构建到ES索引中去。这个过程是没有尽头的。
至此,构建增量索引过程已经完毕。接下来,也是最后,咱们给出一个java api 查询es服务的API demo,供生产环境使用。
Es提供了丰富的java API 用于搜索查询。上图是java工程中给出的一个简单的java API demo,它展现了检索 position_description中含有“天然语言”这个词的检索召回结果。可见一共有匹配52条,要求召回了top10条。
更详细丰富的ES java search API使用方法,可根据须要参见官方ES文档:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_search_apis.html